Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
c3bc6ed
Add tentative SparseTensor format
mrkn Sep 11, 2018
1f16ffe
Fix syntax error in SparseTensor.fbs
mrkn Sep 12, 2018
aa9b8a4
Add SparseTensor.fbs in FBS_SRC
mrkn Sep 12, 2018
866b2c1
Add header comments in SparseTensor.fbs
mrkn Sep 12, 2018
d7e653f
Add an example of COO format in comment
mrkn Sep 17, 2018
76c56dd
Make indptr of CSR a buffer
mrkn Sep 17, 2018
2b50040
Add an example of the CSR format in comment
mrkn Sep 17, 2018
c508db0
Write sparse tensor format in IPC.md
mrkn Sep 24, 2018
b24f3c3
Insert additional padding in sparse tensor format
mrkn Sep 24, 2018
392a25b
Implement SparseTensor and SparseCOOIndex
mrkn Oct 1, 2018
433c9b4
Change COO index matrix to column-major in a format description
mrkn Nov 14, 2018
4251b4d
Add SparseCSRIndex
mrkn Nov 20, 2018
ed3984d
Add SparseIndex::format_type
mrkn Dec 5, 2018
021b46b
Add SparseTensorBase
mrkn Dec 5, 2018
93c03ad
Add SparseIndex::ToString()
mrkn Dec 6, 2018
51a83bf
Add SparseTensorFormat
mrkn Dec 7, 2018
1d90427
Fix format
mrkn Dec 7, 2018
6bc9e29
Support IPC read and write of SparseTensor
mrkn Dec 7, 2018
b3a62eb
Fix format
mrkn Dec 7, 2018
d6a8c38
Unify Tensor.fbs and SparseTensor.fbs
mrkn Dec 7, 2018
3b1db7d
Add SparseTensorBase::Equals
mrkn Dec 7, 2018
9e457ac
Remove needless virtual specifiers
mrkn Dec 9, 2018
401ae80
Fix SparseCSRIndex::ToString and add tests
mrkn Dec 9, 2018
99b1d1d
Add missing ARROW_EXPORT specifiers
mrkn Dec 8, 2018
43d8eea
Fix coding style
mrkn Dec 9, 2018
357860d
Fix typo in comments
mrkn Dec 13, 2018
7e814de
Put EXPERIMENTAL markn in comments
mrkn Dec 13, 2018
f782303
Return Status::IOError instead of DCHECK if message header type is no…
mrkn Dec 13, 2018
ff3ea71
Rename length to non_zero_length in SparseTensor
mrkn Dec 13, 2018
6f29158
Mark APIs for sparse tensor as EXPERIMENTAL
mrkn Jan 7, 2019
6ef6ad0
Apply code formatter
mrkn Jan 7, 2019
3dd434c
Capitalize member function name
mrkn Jan 9, 2019
97e85bd
Use std::make_shared
mrkn Jan 9, 2019
37a0a14
Remove needless function declaration
mrkn Jan 9, 2019
07a6518
Use substitution instead of constructor call
mrkn Jan 9, 2019
90e8b31
Rename sparse tensor classes
mrkn Jan 9, 2019
c83ea6a
Add type aliases of sparse tensor types
mrkn Jan 9, 2019
880bbc4
Rename too-verbose function name
mrkn Jan 9, 2019
d57e56f
Merge sparse_tensor_format.h into sparse_tensor.h
mrkn Jan 9, 2019
148bff8
make format
mrkn Jan 9, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ set(ARROW_SRCS
table.cc
table_builder.cc
tensor.cc
sparse_tensor.cc
type.cc
visitor.cc

Expand Down Expand Up @@ -286,6 +287,7 @@ ADD_ARROW_TEST(type-test)
ADD_ARROW_TEST(table-test)
ADD_ARROW_TEST(table_builder-test)
ADD_ARROW_TEST(tensor-test)
ADD_ARROW_TEST(sparse_tensor-test)

ADD_ARROW_BENCHMARK(builder-benchmark)
ADD_ARROW_BENCHMARK(column-benchmark)
Expand Down
93 changes: 93 additions & 0 deletions cpp/src/arrow/compare.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "arrow/array.h"
#include "arrow/buffer.h"
#include "arrow/sparse_tensor.h"
#include "arrow/status.h"
#include "arrow/tensor.h"
#include "arrow/type.h"
Expand Down Expand Up @@ -782,6 +783,98 @@ bool TensorEquals(const Tensor& left, const Tensor& right) {
return are_equal;
}

namespace {

template <typename LeftSparseIndexType, typename RightSparseIndexType>
struct SparseTensorEqualsImpl {
static bool Compare(const SparseTensorImpl<LeftSparseIndexType>& left,
const SparseTensorImpl<RightSparseIndexType>& right) {
// TODO(mrkn): should we support the equality among different formats?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

return false;
}
};

template <typename SparseIndexType>
struct SparseTensorEqualsImpl<SparseIndexType, SparseIndexType> {
static bool Compare(const SparseTensorImpl<SparseIndexType>& left,
const SparseTensorImpl<SparseIndexType>& right) {
DCHECK(left.type()->id() == right.type()->id());
DCHECK(left.shape() == right.shape());
DCHECK(left.non_zero_length() == right.non_zero_length());

const auto& left_index = checked_cast<const SparseIndexType&>(*left.sparse_index());
const auto& right_index = checked_cast<const SparseIndexType&>(*right.sparse_index());

if (!left_index.Equals(right_index)) {
return false;
}

const auto& size_meta = dynamic_cast<const FixedWidthType&>(*left.type());
const int byte_width = size_meta.bit_width() / CHAR_BIT;
DCHECK_GT(byte_width, 0);

const uint8_t* left_data = left.data()->data();
const uint8_t* right_data = right.data()->data();

return memcmp(left_data, right_data,
static_cast<size_t>(byte_width * left.non_zero_length()));
}
};

template <typename SparseIndexType>
inline bool SparseTensorEqualsImplDispatch(const SparseTensorImpl<SparseIndexType>& left,
const SparseTensor& right) {
switch (right.format_id()) {
case SparseTensorFormat::COO: {
const auto& right_coo =
checked_cast<const SparseTensorImpl<SparseCOOIndex>&>(right);
return SparseTensorEqualsImpl<SparseIndexType, SparseCOOIndex>::Compare(left,
right_coo);
}

case SparseTensorFormat::CSR: {
const auto& right_csr =
checked_cast<const SparseTensorImpl<SparseCSRIndex>&>(right);
return SparseTensorEqualsImpl<SparseIndexType, SparseCSRIndex>::Compare(left,
right_csr);
}

default:
return false;
}
}

} // namespace

bool SparseTensorEquals(const SparseTensor& left, const SparseTensor& right) {
if (&left == &right) {
return true;
} else if (left.type()->id() != right.type()->id()) {
return false;
} else if (left.size() == 0) {
return true;
} else if (left.shape() != right.shape()) {
return false;
} else if (left.non_zero_length() != right.non_zero_length()) {
return false;
}

switch (left.format_id()) {
case SparseTensorFormat::COO: {
const auto& left_coo = checked_cast<const SparseTensorImpl<SparseCOOIndex>&>(left);
return SparseTensorEqualsImplDispatch(left_coo, right);
}

case SparseTensorFormat::CSR: {
const auto& left_csr = checked_cast<const SparseTensorImpl<SparseCSRIndex>&>(left);
return SparseTensorEqualsImplDispatch(left_csr, right);
}

default:
return false;
}
}

bool TypeEquals(const DataType& left, const DataType& right) {
bool are_equal;
// The arrays are the same object
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/compare.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@ namespace arrow {
class Array;
class DataType;
class Tensor;
class SparseTensor;

/// Returns true if the arrays are exactly equal
bool ARROW_EXPORT ArrayEquals(const Array& left, const Array& right);

bool ARROW_EXPORT TensorEquals(const Tensor& left, const Tensor& right);

/// EXPERIMENTAL: Returns true if the given sparse tensors are exactly equal
bool ARROW_EXPORT SparseTensorEquals(const SparseTensor& left, const SparseTensor& right);

/// Returns true if the arrays are approximately equal. For non-floating point
/// types, this is equivalent to ArrayEquals(left, right)
bool ARROW_EXPORT ArrayApproxEquals(const Array& left, const Array& right);
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class Message::MessageImpl {
return Message::RECORD_BATCH;
case flatbuf::MessageHeader_Tensor:
return Message::TENSOR;
case flatbuf::MessageHeader_SparseTensor:
return Message::SPARSE_TENSOR;
default:
return Message::NONE;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ constexpr int kMaxNestingDepth = 64;
/// \brief An IPC message including metadata and body
class ARROW_EXPORT Message {
public:
enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH, TENSOR };
enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH, TENSOR, SPARSE_TENSOR };

/// \brief Construct message, but do not validate
///
Expand Down
148 changes: 148 additions & 0 deletions cpp/src/arrow/ipc/metadata-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "arrow/ipc/Tensor_generated.h" // IWYU pragma: keep
#include "arrow/ipc/message.h"
#include "arrow/ipc/util.h"
#include "arrow/sparse_tensor.h"
#include "arrow/status.h"
#include "arrow/tensor.h"
#include "arrow/type.h"
Expand All @@ -50,6 +51,7 @@ using DictionaryOffset = flatbuffers::Offset<flatbuf::DictionaryEncoding>;
using FieldOffset = flatbuffers::Offset<flatbuf::Field>;
using KeyValueOffset = flatbuffers::Offset<flatbuf::KeyValue>;
using RecordBatchOffset = flatbuffers::Offset<flatbuf::RecordBatch>;
using SparseTensorOffset = flatbuffers::Offset<flatbuf::SparseTensor>;
using Offset = flatbuffers::Offset<void>;
using FBString = flatbuffers::Offset<flatbuffers::String>;

Expand Down Expand Up @@ -781,6 +783,106 @@ Status WriteTensorMessage(const Tensor& tensor, int64_t buffer_start_offset,
body_length, out);
}

Status MakeSparseTensorIndexCOO(FBB& fbb, const SparseCOOIndex& sparse_index,
const std::vector<BufferMetadata>& buffers,
flatbuf::SparseTensorIndex* fb_sparse_index_type,
Offset* fb_sparse_index, size_t* num_buffers) {
*fb_sparse_index_type = flatbuf::SparseTensorIndex_SparseTensorIndexCOO;
const BufferMetadata& indices_metadata = buffers[0];
flatbuf::Buffer indices(indices_metadata.offset, indices_metadata.length);
*fb_sparse_index = flatbuf::CreateSparseTensorIndexCOO(fbb, &indices).Union();
*num_buffers = 1;
return Status::OK();
}

Status MakeSparseMatrixIndexCSR(FBB& fbb, const SparseCSRIndex& sparse_index,
const std::vector<BufferMetadata>& buffers,
flatbuf::SparseTensorIndex* fb_sparse_index_type,
Offset* fb_sparse_index, size_t* num_buffers) {
*fb_sparse_index_type = flatbuf::SparseTensorIndex_SparseMatrixIndexCSR;
const BufferMetadata& indptr_metadata = buffers[0];
const BufferMetadata& indices_metadata = buffers[1];
flatbuf::Buffer indptr(indptr_metadata.offset, indptr_metadata.length);
flatbuf::Buffer indices(indices_metadata.offset, indices_metadata.length);
*fb_sparse_index = flatbuf::CreateSparseMatrixIndexCSR(fbb, &indptr, &indices).Union();
*num_buffers = 2;
return Status::OK();
}

Status MakeSparseTensorIndex(FBB& fbb, const SparseIndex& sparse_index,
const std::vector<BufferMetadata>& buffers,
flatbuf::SparseTensorIndex* fb_sparse_index_type,
Offset* fb_sparse_index, size_t* num_buffers) {
switch (sparse_index.format_id()) {
case SparseTensorFormat::COO:
RETURN_NOT_OK(MakeSparseTensorIndexCOO(
fbb, checked_cast<const SparseCOOIndex&>(sparse_index), buffers,
fb_sparse_index_type, fb_sparse_index, num_buffers));
break;

case SparseTensorFormat::CSR:
RETURN_NOT_OK(MakeSparseMatrixIndexCSR(
fbb, checked_cast<const SparseCSRIndex&>(sparse_index), buffers,
fb_sparse_index_type, fb_sparse_index, num_buffers));
break;

default:
std::stringstream ss;
ss << "Unsupporoted sparse tensor format:: " << sparse_index.ToString()
<< std::endl;
return Status::NotImplemented(ss.str());
}

return Status::OK();
}

Status MakeSparseTensor(FBB& fbb, const SparseTensor& sparse_tensor, int64_t body_length,
const std::vector<BufferMetadata>& buffers,
SparseTensorOffset* offset) {
flatbuf::Type fb_type_type;
Offset fb_type;
RETURN_NOT_OK(
TensorTypeToFlatbuffer(fbb, *sparse_tensor.type(), &fb_type_type, &fb_type));

using TensorDimOffset = flatbuffers::Offset<flatbuf::TensorDim>;
std::vector<TensorDimOffset> dims;
for (int i = 0; i < sparse_tensor.ndim(); ++i) {
FBString name = fbb.CreateString(sparse_tensor.dim_name(i));
dims.push_back(flatbuf::CreateTensorDim(fbb, sparse_tensor.shape()[i], name));
}

auto fb_shape = fbb.CreateVector(dims);

flatbuf::SparseTensorIndex fb_sparse_index_type;
Offset fb_sparse_index;
size_t num_index_buffers = 0;
RETURN_NOT_OK(MakeSparseTensorIndex(fbb, *sparse_tensor.sparse_index(), buffers,
&fb_sparse_index_type, &fb_sparse_index,
&num_index_buffers));

const BufferMetadata& data_metadata = buffers[num_index_buffers];
flatbuf::Buffer data(data_metadata.offset, data_metadata.length);

const int64_t non_zero_length = sparse_tensor.non_zero_length();

*offset =
flatbuf::CreateSparseTensor(fbb, fb_type_type, fb_type, fb_shape, non_zero_length,
fb_sparse_index_type, fb_sparse_index, &data);

return Status::OK();
}

Status WriteSparseTensorMessage(const SparseTensor& sparse_tensor, int64_t body_length,
const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out) {
FBB fbb;
SparseTensorOffset fb_sparse_tensor;
RETURN_NOT_OK(
MakeSparseTensor(fbb, sparse_tensor, body_length, buffers, &fb_sparse_tensor));
return WriteFBMessage(fbb, flatbuf::MessageHeader_SparseTensor,
fb_sparse_tensor.Union(), body_length, out);
}

Status WriteDictionaryMessage(int64_t id, int64_t length, int64_t body_length,
const std::vector<FieldMetadata>& nodes,
const std::vector<BufferMetadata>& buffers,
Expand Down Expand Up @@ -933,6 +1035,52 @@ Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type
return TypeFromFlatbuffer(tensor->type_type(), tensor->type(), {}, type);
}

Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type,
std::vector<int64_t>* shape,
std::vector<std::string>* dim_names,
int64_t* non_zero_length,
SparseTensorFormat::type* sparse_tensor_format_id) {
auto message = flatbuf::GetMessage(metadata.data());
if (message->header_type() != flatbuf::MessageHeader_SparseTensor) {
return Status::IOError("Header of flatbuffer-encoded Message is not SparseTensor.");
}
if (message->header() == nullptr) {
return Status::IOError("Header-pointer of flatbuffer-encoded Message is null.");
}

auto sparse_tensor = reinterpret_cast<const flatbuf::SparseTensor*>(message->header());
int ndim = static_cast<int>(sparse_tensor->shape()->size());

for (int i = 0; i < ndim; ++i) {
auto dim = sparse_tensor->shape()->Get(i);

shape->push_back(dim->size());
auto fb_name = dim->name();
if (fb_name == 0) {
dim_names->push_back("");
} else {
dim_names->push_back(fb_name->str());
}
}

*non_zero_length = sparse_tensor->non_zero_length();

switch (sparse_tensor->sparseIndex_type()) {
case flatbuf::SparseTensorIndex_SparseTensorIndexCOO:
*sparse_tensor_format_id = SparseTensorFormat::COO;
break;

case flatbuf::SparseTensorIndex_SparseMatrixIndexCSR:
*sparse_tensor_format_id = SparseTensorFormat::CSR;
break;

default:
return Status::Invalid("Unrecognized sparse index type");
}

return TypeFromFlatbuffer(sparse_tensor->type_type(), sparse_tensor->type(), {}, type);
}

// ----------------------------------------------------------------------
// Implement message writing

Expand Down
12 changes: 12 additions & 0 deletions cpp/src/arrow/ipc/metadata-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@
#include "arrow/ipc/dictionary.h" // IYWU pragma: keep
#include "arrow/ipc/message.h"
#include "arrow/memory_pool.h"
#include "arrow/sparse_tensor.h"
#include "arrow/status.h"

namespace arrow {

class DataType;
class Schema;
class Tensor;
class SparseTensor;

namespace flatbuf = org::apache::arrow::flatbuf;

Expand Down Expand Up @@ -103,6 +105,12 @@ Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type
std::vector<int64_t>* shape, std::vector<int64_t>* strides,
std::vector<std::string>* dim_names);

// EXPERIMENTAL: Extracting metadata of a sparse tensor from the message
Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type,
std::vector<int64_t>* shape,
std::vector<std::string>* dim_names, int64_t* length,
SparseTensorFormat::type* sparse_tensor_format_id);

/// Write a serialized message metadata with a length-prefix and padding to an
/// 8-byte offset. Does not make assumptions about whether the stream is
/// aligned already
Expand Down Expand Up @@ -137,6 +145,10 @@ Status WriteRecordBatchMessage(const int64_t length, const int64_t body_length,
Status WriteTensorMessage(const Tensor& tensor, const int64_t buffer_start_offset,
std::shared_ptr<Buffer>* out);

Status WriteSparseTensorMessage(const SparseTensor& sparse_tensor, int64_t body_length,
const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out);

Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries,
const std::vector<FileBlock>& record_batches,
DictionaryMemo* dictionary_memo, io::OutputStream* out);
Expand Down
Loading