diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt index 63fc605f2b1..335a858dc35 100644 --- a/cpp/src/arrow/ipc/CMakeLists.txt +++ b/cpp/src/arrow/ipc/CMakeLists.txt @@ -45,8 +45,9 @@ function(ADD_ARROW_IPC_TEST REL_TEST_NAME) endfunction() add_arrow_test(feather_test) -add_arrow_ipc_test(read_write_test) add_arrow_ipc_test(json_simple_test) +add_arrow_ipc_test(read_write_test) +add_arrow_ipc_test(tensor_test) # Headers: top level arrow_install_all_headers("arrow/ipc") diff --git a/cpp/src/arrow/ipc/dictionary.cc b/cpp/src/arrow/ipc/dictionary.cc index bf760b0ecdc..31f5199e79b 100644 --- a/cpp/src/arrow/ipc/dictionary.cc +++ b/cpp/src/arrow/ipc/dictionary.cc @@ -236,10 +236,19 @@ Status DictionaryMemo::AddDictionaryDelta(int64_t id, return Status::OK(); } -Status DictionaryMemo::AddOrReplaceDictionary( +Result DictionaryMemo::AddOrReplaceDictionary( int64_t id, const std::shared_ptr& dictionary) { - impl_->id_to_dictionary_[id] = {dictionary}; - return Status::OK(); + ArrayDataVector value{dictionary}; + + auto pair = impl_->id_to_dictionary_.emplace(id, value); + if (pair.second) { + // Inserted + return true; + } else { + // Update existing value + pair.first->second = std::move(value); + return false; + } } // ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/dictionary.h b/cpp/src/arrow/ipc/dictionary.h index 3f3bd838cc2..263443a43cf 100644 --- a/cpp/src/arrow/ipc/dictionary.h +++ b/cpp/src/arrow/ipc/dictionary.h @@ -133,7 +133,10 @@ class ARROW_EXPORT DictionaryMemo { /// \brief Add a dictionary to the memo if it does not have one with the id, /// otherwise, replace the dictionary with the new one. - Status AddOrReplaceDictionary(int64_t id, const std::shared_ptr& dictionary); + /// + /// Return true if the dictionary was added, false if replaced. + Result AddOrReplaceDictionary(int64_t id, + const std::shared_ptr& dictionary); private: struct Impl; diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index ceb31561772..5d11c275082 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -39,9 +39,7 @@ #include "arrow/ipc/writer.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" -#include "arrow/sparse_tensor.h" #include "arrow/status.h" -#include "arrow/tensor.h" #include "arrow/testing/extension_type.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" @@ -914,6 +912,8 @@ TEST_F(RecursionLimits, StressLimit) { #endif // !defined(_WIN32) || defined(NDEBUG) struct FileWriterHelper { + static constexpr bool kIsFileFormat = true; + Status Init(const std::shared_ptr& schema, const IpcWriteOptions& options, const std::shared_ptr& metadata = nullptr) { num_batches_written_ = 0; @@ -938,11 +938,11 @@ struct FileWriterHelper { return sink_->Tell().Value(&footer_offset_); } - Status ReadBatches(const IpcReadOptions& options, BatchVector* out_batches) { + virtual Status ReadBatches(const IpcReadOptions& options, BatchVector* out_batches, + ReadStats* out_stats = nullptr) { auto buf_reader = std::make_shared(buffer_); - std::shared_ptr reader; - ARROW_ASSIGN_OR_RAISE( - reader, RecordBatchFileReader::Open(buf_reader.get(), footer_offset_, options)); + ARROW_ASSIGN_OR_RAISE(auto reader, RecordBatchFileReader::Open( + buf_reader.get(), footer_offset_, options)); EXPECT_EQ(num_batches_written_, reader->num_record_batches()); for (int i = 0; i < num_batches_written_; ++i) { @@ -950,7 +950,9 @@ struct FileWriterHelper { reader->ReadRecordBatch(i)); out_batches->push_back(chunk); } - + if (out_stats) { + *out_stats = reader->stats(); + } return Status::OK(); } @@ -983,6 +985,8 @@ struct FileWriterHelper { }; struct StreamWriterHelper { + static constexpr bool kIsFileFormat = false; + Status Init(const std::shared_ptr& schema, const IpcWriteOptions& options) { ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(0)); sink_.reset(new io::BufferOutputStream(buffer_)); @@ -1000,11 +1004,15 @@ struct StreamWriterHelper { return sink_->Close(); } - virtual Status ReadBatches(const IpcReadOptions& options, BatchVector* out_batches) { + virtual Status ReadBatches(const IpcReadOptions& options, BatchVector* out_batches, + ReadStats* out_stats = nullptr) { auto buf_reader = std::make_shared(buffer_); - std::shared_ptr reader; - ARROW_ASSIGN_OR_RAISE(reader, RecordBatchStreamReader::Open(buf_reader, options)) - return reader->ReadAll(out_batches); + ARROW_ASSIGN_OR_RAISE(auto reader, RecordBatchStreamReader::Open(buf_reader, options)) + RETURN_NOT_OK(reader->ReadAll(out_batches)); + if (out_stats) { + *out_stats = reader->stats(); + } + return Status::OK(); } Status ReadSchema(std::shared_ptr* out) { @@ -1026,7 +1034,11 @@ struct StreamWriterHelper { }; struct StreamDecoderWriterHelper : public StreamWriterHelper { - Status ReadBatches(const IpcReadOptions& options, BatchVector* out_batches) override { + Status ReadBatches(const IpcReadOptions& options, BatchVector* out_batches, + ReadStats* out_stats = nullptr) override { + if (out_stats) { + return Status::NotImplemented("StreamDecoder does not support stats()"); + } auto listener = std::make_shared(); StreamDecoder decoder(listener, options); RETURN_NOT_OK(DoConsume(&decoder)); @@ -1697,472 +1709,248 @@ TEST(TestRecordBatchStreamReader, NotEnoughDictionaries) { AssertFailsWith(truncated_stream, ex_message); } -class TestTensorRoundTrip : public ::testing::Test, public IpcTestFixture { - public: - void SetUp() { IpcTestFixture::SetUp(); } - void TearDown() { IpcTestFixture::TearDown(); } - - void CheckTensorRoundTrip(const Tensor& tensor) { - int32_t metadata_length; - int64_t body_length; - const int elem_size = GetByteWidth(*tensor.type()); - - ASSERT_OK(mmap_->Seek(0)); - - ASSERT_OK(WriteTensor(tensor, mmap_.get(), &metadata_length, &body_length)); - - const int64_t expected_body_length = elem_size * tensor.size(); - ASSERT_EQ(expected_body_length, body_length); - - ASSERT_OK(mmap_->Seek(0)); - - std::shared_ptr result; - ASSERT_OK_AND_ASSIGN(result, ReadTensor(mmap_.get())); - - ASSERT_EQ(result->data()->size(), expected_body_length); - ASSERT_TRUE(tensor.Equals(*result)); - } -}; - -TEST_F(TestTensorRoundTrip, BasicRoundtrip) { - std::string path = "test-write-tensor"; - constexpr int64_t kBufferSize = 1 << 20; - ASSERT_OK_AND_ASSIGN(mmap_, io::MemoryMapFixture::InitMemoryMap(kBufferSize, path)); - - std::vector shape = {4, 6}; - std::vector strides = {48, 8}; - std::vector dim_names = {"foo", "bar"}; - int64_t size = 24; - - std::vector values; - randint(size, 0, 100, &values); - - auto data = Buffer::Wrap(values); - - Tensor t0(int64(), data, shape, strides, dim_names); - Tensor t_no_dims(int64(), data, {}, {}, {}); - Tensor t_zero_length_dim(int64(), data, {0}, {8}, {"foo"}); +TEST(TestRecordBatchStreamReader, MalformedInput) { + const std::string empty_str = ""; + const std::string garbage_str = "12345678"; - CheckTensorRoundTrip(t0); - CheckTensorRoundTrip(t_no_dims); - CheckTensorRoundTrip(t_zero_length_dim); + auto empty = std::make_shared(empty_str); + auto garbage = std::make_shared(garbage_str); - int64_t serialized_size; - ASSERT_OK(GetTensorSize(t0, &serialized_size)); - ASSERT_TRUE(serialized_size > static_cast(size * sizeof(int64_t))); + io::BufferReader empty_reader(empty); + ASSERT_RAISES(Invalid, RecordBatchStreamReader::Open(&empty_reader)); - // ARROW-2840: Check that padding/alignment minded - std::vector shape_2 = {1, 1}; - std::vector strides_2 = {8, 8}; - Tensor t0_not_multiple_64(int64(), data, shape_2, strides_2, dim_names); - CheckTensorRoundTrip(t0_not_multiple_64); + io::BufferReader garbage_reader(garbage); + ASSERT_RAISES(Invalid, RecordBatchStreamReader::Open(&garbage_reader)); } -TEST_F(TestTensorRoundTrip, NonContiguous) { - std::string path = "test-write-tensor-strided"; - constexpr int64_t kBufferSize = 1 << 20; - ASSERT_OK_AND_ASSIGN(mmap_, io::MemoryMapFixture::InitMemoryMap(kBufferSize, path)); - - std::vector values; - randint(24, 0, 100, &values); - - auto data = Buffer::Wrap(values); - Tensor tensor(int64(), data, {4, 3}, {48, 16}); - - CheckTensorRoundTrip(tensor); +TEST(TestStreamDecoder, NextRequiredSize) { + auto listener = std::make_shared(); + StreamDecoder decoder(listener); + auto next_required_size = decoder.next_required_size(); + const uint8_t data[1] = {0}; + ASSERT_OK(decoder.Consume(data, 1)); + ASSERT_EQ(next_required_size - 1, decoder.next_required_size()); } -template -class TestSparseTensorRoundTrip : public ::testing::Test, public IpcTestFixture { +template +class TestDictionaryReplacement : public ::testing::Test { public: - void SetUp() { IpcTestFixture::SetUp(); } - void TearDown() { IpcTestFixture::TearDown(); } - - void CheckSparseCOOTensorRoundTrip(const SparseCOOTensor& sparse_tensor) { - const int elem_size = GetByteWidth(*sparse_tensor.type()); - const int index_elem_size = sizeof(typename IndexValueType::c_type); - - int32_t metadata_length; - int64_t body_length; - - ASSERT_OK(mmap_->Seek(0)); - - ASSERT_OK( - WriteSparseTensor(sparse_tensor, mmap_.get(), &metadata_length, &body_length)); + using WriterHelper = WriterHelperType; - const auto& sparse_index = - checked_cast(*sparse_tensor.sparse_index()); - const int64_t indices_length = - BitUtil::RoundUpToMultipleOf8(index_elem_size * sparse_index.indices()->size()); - const int64_t data_length = - BitUtil::RoundUpToMultipleOf8(elem_size * sparse_tensor.non_zero_length()); - const int64_t expected_body_length = indices_length + data_length; - ASSERT_EQ(expected_body_length, body_length); + void TestSameDictPointer() { + auto type = dictionary(int8(), utf8()); + auto values = ArrayFromJSON(utf8(), R"(["foo", "bar", "quux"])"); + auto batch1 = MakeBatch(type, ArrayFromJSON(int8(), "[0, 2, null, 1]"), values); + auto batch2 = MakeBatch(type, ArrayFromJSON(int8(), "[1, 0, 0]"), values); + CheckRoundtrip({batch1, batch2}); + + EXPECT_EQ(read_stats_.num_messages, 4); // including schema message + EXPECT_EQ(read_stats_.num_record_batches, 2); + EXPECT_EQ(read_stats_.num_dictionary_batches, 1); + EXPECT_EQ(read_stats_.num_replaced_dictionaries, 0); + EXPECT_EQ(read_stats_.num_dictionary_deltas, 0); + } + + void TestSameDictValues() { + auto type = dictionary(int8(), utf8()); + // Create two separate dictionaries, but with the same contents + auto batch1 = MakeBatch(ArrayFromJSON(type, R"(["foo", "foo", "bar", null])")); + auto batch2 = MakeBatch(ArrayFromJSON(type, R"(["foo", "bar", "foo"])")); + CheckRoundtrip({batch1, batch2}); + + EXPECT_EQ(read_stats_.num_messages, 4); // including schema message + EXPECT_EQ(read_stats_.num_record_batches, 2); + EXPECT_EQ(read_stats_.num_dictionary_batches, 1); + EXPECT_EQ(read_stats_.num_replaced_dictionaries, 0); + EXPECT_EQ(read_stats_.num_dictionary_deltas, 0); + } + + void TestSameDictValuesNested() { + CheckRoundtrip(SameValuesNestedDictBatches()); + + EXPECT_EQ(read_stats_.num_messages, 5); // including schema message + EXPECT_EQ(read_stats_.num_record_batches, 2); + EXPECT_EQ(read_stats_.num_dictionary_batches, 2); + EXPECT_EQ(read_stats_.num_replaced_dictionaries, 0); + EXPECT_EQ(read_stats_.num_dictionary_deltas, 0); + } + + void TestDifferentDictValues() { + if (WriterHelper::kIsFileFormat) { + CheckWritingFails(DifferentOrderDictBatches(), 1); + CheckWritingFails(DifferentValuesDictBatches(), 1); + return; + } + CheckRoundtrip(DifferentOrderDictBatches()); - ASSERT_OK(mmap_->Seek(0)); + EXPECT_EQ(read_stats_.num_messages, 5); // including schema message + EXPECT_EQ(read_stats_.num_record_batches, 2); + EXPECT_EQ(read_stats_.num_dictionary_batches, 2); + EXPECT_EQ(read_stats_.num_replaced_dictionaries, 1); + EXPECT_EQ(read_stats_.num_dictionary_deltas, 0); - std::shared_ptr result; - ASSERT_OK_AND_ASSIGN(result, ReadSparseTensor(mmap_.get())); - ASSERT_EQ(SparseTensorFormat::COO, result->format_id()); + CheckRoundtrip(DifferentValuesDictBatches()); - const auto& resulted_sparse_index = - checked_cast(*result->sparse_index()); - ASSERT_EQ(resulted_sparse_index.indices()->data()->size(), indices_length); - ASSERT_EQ(resulted_sparse_index.is_canonical(), sparse_index.is_canonical()); - ASSERT_EQ(result->data()->size(), data_length); - ASSERT_TRUE(result->Equals(sparse_tensor)); + EXPECT_EQ(read_stats_.num_messages, 5); // including schema message + EXPECT_EQ(read_stats_.num_record_batches, 2); + EXPECT_EQ(read_stats_.num_dictionary_batches, 2); + EXPECT_EQ(read_stats_.num_replaced_dictionaries, 1); + EXPECT_EQ(read_stats_.num_dictionary_deltas, 0); } - template - void CheckSparseCSXMatrixRoundTrip( - const SparseTensorImpl& sparse_tensor) { - static_assert(std::is_same::value || - std::is_same::value, - "SparseIndexType must be either SparseCSRIndex or SparseCSCIndex"); - - const int elem_size = GetByteWidth(*sparse_tensor.type()); - const int index_elem_size = sizeof(typename IndexValueType::c_type); - - int32_t metadata_length; - int64_t body_length; - - ASSERT_OK(mmap_->Seek(0)); - - ASSERT_OK( - WriteSparseTensor(sparse_tensor, mmap_.get(), &metadata_length, &body_length)); - - const auto& sparse_index = - checked_cast(*sparse_tensor.sparse_index()); - const int64_t indptr_length = - BitUtil::RoundUpToMultipleOf8(index_elem_size * sparse_index.indptr()->size()); - const int64_t indices_length = - BitUtil::RoundUpToMultipleOf8(index_elem_size * sparse_index.indices()->size()); - const int64_t data_length = - BitUtil::RoundUpToMultipleOf8(elem_size * sparse_tensor.non_zero_length()); - const int64_t expected_body_length = indptr_length + indices_length + data_length; - ASSERT_EQ(expected_body_length, body_length); - - ASSERT_OK(mmap_->Seek(0)); + void TestDifferentDictValuesNested() { + if (WriterHelper::kIsFileFormat) { + CheckWritingFails(DifferentValuesNestedDictBatches1(), 1); + CheckWritingFails(DifferentValuesNestedDictBatches2(), 1); + return; + } + CheckRoundtrip(DifferentValuesNestedDictBatches1()); - std::shared_ptr result; - ASSERT_OK_AND_ASSIGN(result, ReadSparseTensor(mmap_.get())); + EXPECT_EQ(read_stats_.num_messages, 7); // including schema message + EXPECT_EQ(read_stats_.num_record_batches, 2); + // Both inner and outer dict were replaced + EXPECT_EQ(read_stats_.num_dictionary_batches, 4); + EXPECT_EQ(read_stats_.num_replaced_dictionaries, 2); + EXPECT_EQ(read_stats_.num_dictionary_deltas, 0); - constexpr auto expected_format_id = - std::is_same::value ? SparseTensorFormat::CSR - : SparseTensorFormat::CSC; - ASSERT_EQ(expected_format_id, result->format_id()); + CheckRoundtrip(DifferentValuesNestedDictBatches2()); - const auto& resulted_sparse_index = - checked_cast(*result->sparse_index()); - ASSERT_EQ(resulted_sparse_index.indptr()->data()->size(), indptr_length); - ASSERT_EQ(resulted_sparse_index.indices()->data()->size(), indices_length); - ASSERT_EQ(result->data()->size(), data_length); - ASSERT_TRUE(result->Equals(sparse_tensor)); + EXPECT_EQ(read_stats_.num_messages, 6); // including schema message + EXPECT_EQ(read_stats_.num_record_batches, 2); + // Only inner dict was replaced + EXPECT_EQ(read_stats_.num_dictionary_batches, 3); + EXPECT_EQ(read_stats_.num_replaced_dictionaries, 1); + EXPECT_EQ(read_stats_.num_dictionary_deltas, 0); } - void CheckSparseCSFTensorRoundTrip(const SparseCSFTensor& sparse_tensor) { - const int elem_size = GetByteWidth(*sparse_tensor.type()); - const int index_elem_size = sizeof(typename IndexValueType::c_type); - - int32_t metadata_length; - int64_t body_length; - - ASSERT_OK(mmap_->Seek(0)); - - ASSERT_OK( - WriteSparseTensor(sparse_tensor, mmap_.get(), &metadata_length, &body_length)); - - const auto& sparse_index = - checked_cast(*sparse_tensor.sparse_index()); - - const int64_t ndim = sparse_index.axis_order().size(); - int64_t indptr_length = 0; - int64_t indices_length = 0; - - for (int64_t i = 0; i < ndim - 1; ++i) { - indptr_length += BitUtil::RoundUpToMultipleOf8(index_elem_size * - sparse_index.indptr()[i]->size()); + Status RoundTrip(const BatchVector& in_batches, BatchVector* out_batches) { + WriterHelper writer_helper; + RETURN_NOT_OK(writer_helper.Init(in_batches[0]->schema(), write_options_)); + for (const auto& batch : in_batches) { + RETURN_NOT_OK(writer_helper.WriteBatch(batch)); } - for (int64_t i = 0; i < ndim; ++i) { - indices_length += BitUtil::RoundUpToMultipleOf8(index_elem_size * - sparse_index.indices()[i]->size()); + RETURN_NOT_OK(writer_helper.Finish()); + RETURN_NOT_OK(writer_helper.ReadBatches(read_options_, out_batches, &read_stats_)); + for (const auto& batch : *out_batches) { + RETURN_NOT_OK(batch->ValidateFull()); } - const int64_t data_length = - BitUtil::RoundUpToMultipleOf8(elem_size * sparse_tensor.non_zero_length()); - const int64_t expected_body_length = indptr_length + indices_length + data_length; - ASSERT_EQ(expected_body_length, body_length); - - ASSERT_OK(mmap_->Seek(0)); - - std::shared_ptr result; - ASSERT_OK_AND_ASSIGN(result, ReadSparseTensor(mmap_.get())); - ASSERT_EQ(SparseTensorFormat::CSF, result->format_id()); - - const auto& resulted_sparse_index = - checked_cast(*result->sparse_index()); + return Status::OK(); + } - int64_t out_indptr_length = 0; - int64_t out_indices_length = 0; - for (int i = 0; i < ndim - 1; ++i) { - out_indptr_length += BitUtil::RoundUpToMultipleOf8( - index_elem_size * resulted_sparse_index.indptr()[i]->size()); - } - for (int i = 0; i < ndim; ++i) { - out_indices_length += BitUtil::RoundUpToMultipleOf8( - index_elem_size * resulted_sparse_index.indices()[i]->size()); + void CheckRoundtrip(const BatchVector& in_batches) { + BatchVector out_batches; + ASSERT_OK(RoundTrip(in_batches, &out_batches)); + ASSERT_EQ(in_batches.size(), out_batches.size()); + for (size_t i = 0; i < in_batches.size(); ++i) { + AssertBatchesEqual(*in_batches[i], *out_batches[i]); } - - ASSERT_EQ(out_indptr_length, indptr_length); - ASSERT_EQ(out_indices_length, indices_length); - ASSERT_EQ(result->data()->size(), data_length); - ASSERT_TRUE(resulted_sparse_index.Equals(sparse_index)); - ASSERT_TRUE(result->Equals(sparse_tensor)); } - protected: - std::shared_ptr MakeSparseCOOIndex( - const std::vector& coords_shape, - const std::vector& coords_strides, - std::vector& coords_values) const { - auto coords_data = Buffer::Wrap(coords_values); - auto coords = std::make_shared>( - coords_data, coords_shape, coords_strides); - return std::make_shared(coords); - } - - template - Result> MakeSparseCOOTensor( - const std::shared_ptr& si, std::vector& sparse_values, - const std::vector& shape, - const std::vector& dim_names = {}) const { - auto data = Buffer::Wrap(sparse_values); - return SparseCOOTensor::Make(si, CTypeTraits::type_singleton(), data, - shape, dim_names); + void CheckWritingFails(const BatchVector& in_batches, size_t fails_at_batch_num) { + WriterHelper writer_helper; + ASSERT_OK(writer_helper.Init(in_batches[0]->schema(), write_options_)); + for (size_t i = 0; i < fails_at_batch_num; ++i) { + ASSERT_OK(writer_helper.WriteBatch(in_batches[i])); + } + ASSERT_RAISES(Invalid, writer_helper.WriteBatch(in_batches[fails_at_batch_num])); } -}; - -TYPED_TEST_SUITE_P(TestSparseTensorRoundTrip); - -TYPED_TEST_P(TestSparseTensorRoundTrip, WithSparseCOOIndexRowMajor) { - using IndexValueType = TypeParam; - using c_index_value_type = typename IndexValueType::c_type; - - std::string path = "test-write-sparse-coo-tensor"; - constexpr int64_t kBufferSize = 1 << 20; - ASSERT_OK_AND_ASSIGN(this->mmap_, - io::MemoryMapFixture::InitMemoryMap(kBufferSize, path)); - - // Dense representation: - // [ - // [ - // 1 0 2 0 - // 0 3 0 4 - // 5 0 6 0 - // ], - // [ - // 0 11 0 12 - // 13 0 14 0 - // 0 15 0 16 - // ] - // ] - // - // Sparse representation: - // idx[0] = [0 0 0 0 0 0 1 1 1 1 1 1] - // idx[1] = [0 0 1 1 2 2 0 0 1 1 2 2] - // idx[2] = [0 2 1 3 0 2 1 3 0 2 1 3] - // data = [1 2 3 4 5 6 11 12 13 14 15 16] - - // canonical - std::vector coords_values = {0, 0, 0, 0, 0, 2, 0, 1, 1, 0, 1, 3, - 0, 2, 0, 0, 2, 2, 1, 0, 1, 1, 0, 3, - 1, 1, 0, 1, 1, 2, 1, 2, 1, 1, 2, 3}; - const int sizeof_index_value = sizeof(c_index_value_type); - std::shared_ptr si; - ASSERT_OK_AND_ASSIGN( - si, SparseCOOIndex::Make(TypeTraits::type_singleton(), {12, 3}, - {sizeof_index_value * 3, sizeof_index_value}, - Buffer::Wrap(coords_values))); - ASSERT_TRUE(si->is_canonical()); - - std::vector shape = {2, 3, 4}; - std::vector dim_names = {"foo", "bar", "baz"}; - std::vector values = {1, 2, 3, 4, 5, 6, 11, 12, 13, 14, 15, 16}; - std::shared_ptr st; - ASSERT_OK_AND_ASSIGN(st, this->MakeSparseCOOTensor(si, values, shape, dim_names)); - - this->CheckSparseCOOTensorRoundTrip(*st); - - // non-canonical - ASSERT_OK_AND_ASSIGN( - si, SparseCOOIndex::Make(TypeTraits::type_singleton(), {12, 3}, - {sizeof_index_value * 3, sizeof_index_value}, - Buffer::Wrap(coords_values), false)); - ASSERT_FALSE(si->is_canonical()); - ASSERT_OK_AND_ASSIGN(st, this->MakeSparseCOOTensor(si, values, shape, dim_names)); - this->CheckSparseCOOTensorRoundTrip(*st); -} + BatchVector DifferentOrderDictBatches() { + // Create two separate dictionaries with different order + auto type = dictionary(int8(), utf8()); + auto batch1 = MakeBatch(ArrayFromJSON(type, R"(["foo", "foo", "bar", null])")); + auto batch2 = MakeBatch(ArrayFromJSON(type, R"(["bar", "bar", "foo"])")); + return {batch1, batch2}; + } -TYPED_TEST_P(TestSparseTensorRoundTrip, WithSparseCOOIndexColumnMajor) { - using IndexValueType = TypeParam; - using c_index_value_type = typename IndexValueType::c_type; - - std::string path = "test-write-sparse-coo-tensor"; - constexpr int64_t kBufferSize = 1 << 20; - ASSERT_OK_AND_ASSIGN(this->mmap_, - io::MemoryMapFixture::InitMemoryMap(kBufferSize, path)); - - // Dense representation: - // [ - // [ - // 1 0 2 0 - // 0 3 0 4 - // 5 0 6 0 - // ], - // [ - // 0 11 0 12 - // 13 0 14 0 - // 0 15 0 16 - // ] - // ] - // - // Sparse representation: - // idx[0] = [0 0 0 0 0 0 1 1 1 1 1 1] - // idx[1] = [0 0 1 1 2 2 0 0 1 1 2 2] - // idx[2] = [0 2 1 3 0 2 1 3 0 2 1 3] - // data = [1 2 3 4 5 6 11 12 13 14 15 16] - - // canonical - std::vector coords_values = {0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, - 0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2, - 0, 2, 1, 3, 0, 2, 1, 3, 0, 2, 1, 3}; - const int sizeof_index_value = sizeof(c_index_value_type); - std::shared_ptr si; - ASSERT_OK_AND_ASSIGN( - si, SparseCOOIndex::Make(TypeTraits::type_singleton(), {12, 3}, - {sizeof_index_value, sizeof_index_value * 12}, - Buffer::Wrap(coords_values))); - ASSERT_TRUE(si->is_canonical()); + BatchVector DifferentValuesDictBatches() { + // Create two separate dictionaries with different values + auto type = dictionary(int8(), utf8()); + auto batch1 = MakeBatch(ArrayFromJSON(type, R"(["foo", "foo", "bar", null])")); + auto batch2 = MakeBatch(ArrayFromJSON(type, R"(["bar", "quux", "quux"])")); + return {batch1, batch2}; + } - std::vector shape = {2, 3, 4}; - std::vector dim_names = {"foo", "bar", "baz"}; - std::vector values = {1, 2, 3, 4, 5, 6, 11, 12, 13, 14, 15, 16}; + BatchVector SameValuesNestedDictBatches() { + auto value_type = list(dictionary(int8(), utf8())); + auto type = dictionary(int8(), value_type); + auto batch1_values = ArrayFromJSON(value_type, R"([[], ["a"], ["b"], ["a", "a"]])"); + auto batch2_values = ArrayFromJSON(value_type, R"([[], ["a"], ["b"], ["a", "a"]])"); + auto batch1 = MakeBatch(type, ArrayFromJSON(int8(), "[1, 3, 0, 3]"), batch1_values); + auto batch2 = MakeBatch(type, ArrayFromJSON(int8(), "[2, null, 2]"), batch2_values); + return {batch1, batch2}; + } - std::shared_ptr st; - ASSERT_OK_AND_ASSIGN(st, this->MakeSparseCOOTensor(si, values, shape, dim_names)); + BatchVector DifferentValuesNestedDictBatches1() { + // Inner dictionary values differ + auto value_type = list(dictionary(int8(), utf8())); + auto type = dictionary(int8(), value_type); + auto batch1_values = ArrayFromJSON(value_type, R"([[], ["a"], ["b"], ["a", "a"]])"); + auto batch2_values = ArrayFromJSON(value_type, R"([[], ["a"], ["c"], ["a", "a"]])"); + auto batch1 = MakeBatch(type, ArrayFromJSON(int8(), "[1, 3, 0, 3]"), batch1_values); + auto batch2 = MakeBatch(type, ArrayFromJSON(int8(), "[2, null, 2]"), batch2_values); + return {batch1, batch2}; + } - this->CheckSparseCOOTensorRoundTrip(*st); + BatchVector DifferentValuesNestedDictBatches2() { + // Outer dictionary values differ + auto value_type = list(dictionary(int8(), utf8())); + auto type = dictionary(int8(), value_type); + auto batch1_values = ArrayFromJSON(value_type, R"([[], ["a"], ["b"], ["a", "a"]])"); + auto batch2_values = ArrayFromJSON(value_type, R"([["a"], ["b"], ["a", "a"]])"); + auto batch1 = MakeBatch(type, ArrayFromJSON(int8(), "[1, 3, 0, 3]"), batch1_values); + auto batch2 = MakeBatch(type, ArrayFromJSON(int8(), "[2, null, 2]"), batch2_values); + return {batch1, batch2}; + } - // non-canonical - ASSERT_OK_AND_ASSIGN( - si, SparseCOOIndex::Make(TypeTraits::type_singleton(), {12, 3}, - {sizeof_index_value, sizeof_index_value * 12}, - Buffer::Wrap(coords_values), false)); - ASSERT_FALSE(si->is_canonical()); - ASSERT_OK_AND_ASSIGN(st, this->MakeSparseCOOTensor(si, values, shape, dim_names)); + // Make one-column batch + std::shared_ptr MakeBatch(std::shared_ptr column) { + return RecordBatch::Make(schema({field("f", column->type())}), column->length(), + {column}); + } - this->CheckSparseCOOTensorRoundTrip(*st); -} + // Make one-column batch with a dictionary array + std::shared_ptr MakeBatch(std::shared_ptr type, + std::shared_ptr indices, + std::shared_ptr dictionary) { + auto array = *DictionaryArray::FromArrays(std::move(type), std::move(indices), + std::move(dictionary)); + return MakeBatch(std::move(array)); + } -TYPED_TEST_P(TestSparseTensorRoundTrip, WithSparseCSRIndex) { - using IndexValueType = TypeParam; + protected: + IpcWriteOptions write_options_ = IpcWriteOptions::Defaults(); + IpcReadOptions read_options_ = IpcReadOptions::Defaults(); + ReadStats read_stats_; +}; - std::string path = "test-write-sparse-csr-matrix"; - constexpr int64_t kBufferSize = 1 << 20; - ASSERT_OK_AND_ASSIGN(this->mmap_, - io::MemoryMapFixture::InitMemoryMap(kBufferSize, path)); +TYPED_TEST_SUITE_P(TestDictionaryReplacement); - std::vector shape = {4, 6}; - std::vector dim_names = {"foo", "bar", "baz"}; - std::vector values = {1, 0, 2, 0, 0, 3, 0, 4, 5, 0, 6, 0, - 0, 11, 0, 12, 13, 0, 14, 0, 0, 15, 0, 16}; +TYPED_TEST_P(TestDictionaryReplacement, SameDictPointer) { this->TestSameDictPointer(); } - auto data = Buffer::Wrap(values); - NumericTensor t(data, shape, {}, dim_names); - std::shared_ptr st; - ASSERT_OK_AND_ASSIGN( - st, SparseCSRMatrix::Make(t, TypeTraits::type_singleton())); +TYPED_TEST_P(TestDictionaryReplacement, SameDictValues) { this->TestSameDictValues(); } - this->CheckSparseCSXMatrixRoundTrip(*st); +TYPED_TEST_P(TestDictionaryReplacement, SameDictValuesNested) { + this->TestSameDictValuesNested(); } -TYPED_TEST_P(TestSparseTensorRoundTrip, WithSparseCSCIndex) { - using IndexValueType = TypeParam; - - std::string path = "test-write-sparse-csc-matrix"; - constexpr int64_t kBufferSize = 1 << 20; - ASSERT_OK_AND_ASSIGN(this->mmap_, - io::MemoryMapFixture::InitMemoryMap(kBufferSize, path)); - - std::vector shape = {4, 6}; - std::vector dim_names = {"foo", "bar", "baz"}; - std::vector values = {1, 0, 2, 0, 0, 3, 0, 4, 5, 0, 6, 0, - 0, 11, 0, 12, 13, 0, 14, 0, 0, 15, 0, 16}; - - auto data = Buffer::Wrap(values); - NumericTensor t(data, shape, {}, dim_names); - std::shared_ptr st; - ASSERT_OK_AND_ASSIGN( - st, SparseCSCMatrix::Make(t, TypeTraits::type_singleton())); - - this->CheckSparseCSXMatrixRoundTrip(*st); +TYPED_TEST_P(TestDictionaryReplacement, DifferentDictValues) { + this->TestDifferentDictValues(); } -TYPED_TEST_P(TestSparseTensorRoundTrip, WithSparseCSFIndex) { - using IndexValueType = TypeParam; - - std::string path = "test-write-sparse-csf-tensor"; - constexpr int64_t kBufferSize = 1 << 20; - ASSERT_OK_AND_ASSIGN(this->mmap_, - io::MemoryMapFixture::InitMemoryMap(kBufferSize, path)); - - std::vector shape = {4, 6}; - std::vector dim_names = {"foo", "bar", "baz"}; - std::vector values = {1, 0, 2, 0, 0, 3, 0, 4, 5, 0, 6, 0, - 0, 11, 0, 12, 13, 0, 14, 0, 0, 15, 0, 16}; - - auto data = Buffer::Wrap(values); - NumericTensor t(data, shape, {}, dim_names); - std::shared_ptr st; - ASSERT_OK_AND_ASSIGN( - st, SparseCSFTensor::Make(t, TypeTraits::type_singleton())); - - this->CheckSparseCSFTensorRoundTrip(*st); +TYPED_TEST_P(TestDictionaryReplacement, DifferentDictValuesNested) { + this->TestDifferentDictValuesNested(); } -REGISTER_TYPED_TEST_SUITE_P(TestSparseTensorRoundTrip, WithSparseCOOIndexRowMajor, - WithSparseCOOIndexColumnMajor, WithSparseCSRIndex, - WithSparseCSCIndex, WithSparseCSFIndex); - -INSTANTIATE_TYPED_TEST_SUITE_P(TestInt8, TestSparseTensorRoundTrip, Int8Type); -INSTANTIATE_TYPED_TEST_SUITE_P(TestUInt8, TestSparseTensorRoundTrip, UInt8Type); -INSTANTIATE_TYPED_TEST_SUITE_P(TestInt16, TestSparseTensorRoundTrip, Int16Type); -INSTANTIATE_TYPED_TEST_SUITE_P(TestUInt16, TestSparseTensorRoundTrip, UInt16Type); -INSTANTIATE_TYPED_TEST_SUITE_P(TestInt32, TestSparseTensorRoundTrip, Int32Type); -INSTANTIATE_TYPED_TEST_SUITE_P(TestUInt32, TestSparseTensorRoundTrip, UInt32Type); -INSTANTIATE_TYPED_TEST_SUITE_P(TestInt64, TestSparseTensorRoundTrip, Int64Type); - -TEST(TestRecordBatchStreamReader, MalformedInput) { - const std::string empty_str = ""; - const std::string garbage_str = "12345678"; - auto empty = std::make_shared(empty_str); - auto garbage = std::make_shared(garbage_str); +REGISTER_TYPED_TEST_SUITE_P(TestDictionaryReplacement, SameDictPointer, SameDictValues, + SameDictValuesNested, DifferentDictValues, + DifferentDictValuesNested); - io::BufferReader empty_reader(empty); - ASSERT_RAISES(Invalid, RecordBatchStreamReader::Open(&empty_reader)); - - io::BufferReader garbage_reader(garbage); - ASSERT_RAISES(Invalid, RecordBatchStreamReader::Open(&garbage_reader)); -} +using DictionaryReplacementTestTypes = + ::testing::Types; -TEST(TestStreamDecoder, NextRequiredSize) { - auto listener = std::make_shared(); - StreamDecoder decoder(listener); - auto next_required_size = decoder.next_required_size(); - const uint8_t data[1] = {0}; - ASSERT_OK(decoder.Consume(data, 1)); - ASSERT_EQ(next_required_size - 1, decoder.next_required_size()); -} +INSTANTIATE_TYPED_TEST_SUITE_P(TestDictionaryReplacement, TestDictionaryReplacement, + DictionaryReplacementTestTypes); // ---------------------------------------------------------------------- // Miscellanea diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index a1b6ba4c0db..92f2b70f294 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -72,6 +72,8 @@ using internal::kArrowMagicBytes; namespace { +enum class DictionaryKind { New, Delta, Replacement }; + Status InvalidMessageType(MessageType expected, MessageType actual) { return Status::IOError("Expected IPC message of type ", FormatMessageType(expected), " but got ", FormatMessageType(actual)); @@ -669,17 +671,18 @@ Result> ReadRecordBatch( } Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo, - const IpcReadOptions& options, io::RandomAccessFile* file) { + const IpcReadOptions& options, DictionaryKind* kind, + io::RandomAccessFile* file) { const flatbuf::Message* message = nullptr; RETURN_NOT_OK(internal::VerifyMessage(metadata.data(), metadata.size(), &message)); - auto dictionary_batch = message->header_as_DictionaryBatch(); + const auto dictionary_batch = message->header_as_DictionaryBatch(); if (dictionary_batch == nullptr) { return Status::IOError( "Header-type of flatbuffer-encoded Message is not DictionaryBatch."); } // The dictionary is embedded in a record batch with a single column - auto batch_meta = dictionary_batch->data(); + const auto batch_meta = dictionary_batch->data(); CHECK_FLATBUFFERS_NOT_NULL(batch_meta, "DictionaryBatch.data"); @@ -692,7 +695,7 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo, RETURN_NOT_OK(GetCompressionExperimental(message, &compression)); } - int64_t id = dictionary_batch->id(); + const int64_t id = dictionary_batch->id(); // Look up the dictionary value type, which must have been added to the // DictionaryMemo already prior to invoking this function @@ -701,8 +704,8 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo, // Load the dictionary data from the dictionary batch ArrayLoader loader(batch_meta, internal::GetMetadataVersion(message->version()), options, file); - auto dict_data = std::make_shared(); - Field dummy_field("", value_type); + const auto dict_data = std::make_shared(); + const Field dummy_field("", value_type); RETURN_NOT_OK(loader.Load(&dummy_field, dict_data.get())); if (compression != Compression::UNCOMPRESSED) { @@ -711,18 +714,27 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo, } if (dictionary_batch->isDelta()) { + if (kind != nullptr) { + *kind = DictionaryKind::Delta; + } return dictionary_memo->AddDictionaryDelta(id, dict_data); } - return dictionary_memo->AddOrReplaceDictionary(id, dict_data); + ARROW_ASSIGN_OR_RAISE(bool inserted, + dictionary_memo->AddOrReplaceDictionary(id, dict_data)); + if (kind != nullptr) { + *kind = inserted ? DictionaryKind::New : DictionaryKind::Replacement; + } + return Status::OK(); } Status ReadDictionary(const Message& message, DictionaryMemo* dictionary_memo, - const IpcReadOptions& options) { + const IpcReadOptions& options, DictionaryKind* kind) { // Only invoke this method if we already know we have a dictionary message DCHECK_EQ(message.type(), MessageType::DICTIONARY_BATCH); CHECK_HAS_BODY(message); ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message.body())); - return ReadDictionary(*message.metadata(), dictionary_memo, options, reader.get()); + return ReadDictionary(*message.metadata(), dictionary_memo, options, kind, + reader.get()); } // ---------------------------------------------------------------------- @@ -736,8 +748,7 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader { options_ = options; // Read schema - ARROW_ASSIGN_OR_RAISE(std::unique_ptr message, - message_reader_->ReadNextMessage()); + ARROW_ASSIGN_OR_RAISE(std::unique_ptr message, ReadNextMessage()); if (!message) { return Status::Invalid("Tried reading schema message, was null or length 0"); } @@ -760,11 +771,11 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader { // Continue to read other dictionaries, if any std::unique_ptr message; - ARROW_ASSIGN_OR_RAISE(message, message_reader_->ReadNextMessage()); + ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage()); while (message != nullptr && message->type() == MessageType::DICTIONARY_BATCH) { - RETURN_NOT_OK(ReadDictionary(*message, &dictionary_memo_, options_)); - ARROW_ASSIGN_OR_RAISE(message, message_reader_->ReadNextMessage()); + RETURN_NOT_OK(ReadDictionary(*message)); + ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage()); } if (message == nullptr) { @@ -782,7 +793,45 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader { std::shared_ptr schema() const override { return out_schema_; } + ReadStats stats() const override { return stats_; } + private: + Result> ReadNextMessage() { + ARROW_ASSIGN_OR_RAISE(auto message, message_reader_->ReadNextMessage()); + if (message) { + ++stats_.num_messages; + switch (message->type()) { + case MessageType::RECORD_BATCH: + ++stats_.num_record_batches; + break; + case MessageType::DICTIONARY_BATCH: + ++stats_.num_dictionary_batches; + break; + default: + break; + } + } + return std::move(message); + } + + // Read dictionary from dictionary batch + Status ReadDictionary(const Message& message) { + DictionaryKind kind; + RETURN_NOT_OK( + ::arrow::ipc::ReadDictionary(message, &dictionary_memo_, options_, &kind)); + switch (kind) { + case DictionaryKind::New: + break; + case DictionaryKind::Delta: + ++stats_.num_dictionary_deltas; + break; + case DictionaryKind::Replacement: + ++stats_.num_replaced_dictionaries; + break; + } + return Status::OK(); + } + Status ReadInitialDictionaries() { // We must receive all dictionaries before reconstructing the // first record batch. Subsequent dictionary deltas modify the memo @@ -792,7 +841,7 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader { // those found in the schema const auto num_dicts = dictionary_memo_.fields().num_fields(); for (int i = 0; i < num_dicts; ++i) { - ARROW_ASSIGN_OR_RAISE(message, message_reader_->ReadNextMessage()); + ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage()); if (!message) { if (i == 0) { /// ARROW-6006: If we fail to find any dictionaries in the stream, then @@ -814,7 +863,7 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader { return Status::Invalid("IPC stream did not have the expected number (", num_dicts, ") of dictionaries at the start of the stream"); } - RETURN_NOT_OK(ReadDictionary(*message, &dictionary_memo_, options_)); + RETURN_NOT_OK(ReadDictionary(*message)); } have_read_initial_dictionaries_ = true; @@ -831,6 +880,8 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader { // and so the reader should not attempt to parse any messages bool empty_stream_ = false; + ReadStats stats_; + DictionaryMemo dictionary_memo_; std::shared_ptr schema_, out_schema_; }; @@ -838,7 +889,7 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader { // ---------------------------------------------------------------------- // Stream reader constructors -Result> RecordBatchStreamReader::Open( +Result> RecordBatchStreamReader::Open( std::unique_ptr message_reader, const IpcReadOptions& options) { // Private ctor auto result = std::make_shared(); @@ -846,12 +897,12 @@ Result> RecordBatchStreamReader::Open( return result; } -Result> RecordBatchStreamReader::Open( +Result> RecordBatchStreamReader::Open( io::InputStream* stream, const IpcReadOptions& options) { return Open(MessageReader::Open(stream), options); } -Result> RecordBatchStreamReader::Open( +Result> RecordBatchStreamReader::Open( const std::shared_ptr& stream, const IpcReadOptions& options) { return Open(MessageReader::Open(stream), options); } @@ -884,13 +935,16 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { read_dictionaries_ = true; } - std::unique_ptr message; - RETURN_NOT_OK(ReadMessageFromBlock(GetRecordBatchBlock(i), &message)); + ARROW_ASSIGN_OR_RAISE(auto message, ReadMessageFromBlock(GetRecordBatchBlock(i))); CHECK_HAS_BODY(*message); ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); - return ReadRecordBatchInternal(*message->metadata(), schema_, field_inclusion_mask_, - &dictionary_memo_, options_, reader.get()); + ARROW_ASSIGN_OR_RAISE( + auto batch, + ReadRecordBatchInternal(*message->metadata(), schema_, field_inclusion_mask_, + &dictionary_memo_, options_, reader.get())); + ++stats_.num_record_batches; + return batch; } Status Open(const std::shared_ptr& file, int64_t footer_offset, @@ -907,14 +961,18 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { RETURN_NOT_OK(ReadFooter()); // Get the schema and record any observed dictionaries - return UnpackSchemaMessage(footer_->schema(), options, &dictionary_memo_, &schema_, - &out_schema_, &field_inclusion_mask_); + RETURN_NOT_OK(UnpackSchemaMessage(footer_->schema(), options, &dictionary_memo_, + &schema_, &out_schema_, &field_inclusion_mask_)); + ++stats_.num_messages; + return Status::OK(); } std::shared_ptr schema() const override { return out_schema_; } std::shared_ptr metadata() const override { return metadata_; } + ReadStats stats() const override { return stats_; } + private: FileBlock GetRecordBatchBlock(int i) const { return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i)); @@ -924,7 +982,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i)); } - Status ReadMessageFromBlock(const FileBlock& block, std::unique_ptr* out) { + Result> ReadMessageFromBlock(const FileBlock& block) { if (!BitUtil::IsMultipleOf8(block.offset) || !BitUtil::IsMultipleOf8(block.metadata_length) || !BitUtil::IsMultipleOf8(block.body_length)) { @@ -934,19 +992,28 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { // TODO(wesm): this breaks integration tests, see ARROW-3256 // DCHECK_EQ((*out)->body_length(), block.body_length); - return ReadMessage(block.offset, block.metadata_length, file_).Value(out); + ARROW_ASSIGN_OR_RAISE(auto message, + ReadMessage(block.offset, block.metadata_length, file_)); + ++stats_.num_messages; + return std::move(message); } Status ReadDictionaries() { // Read all the dictionaries for (int i = 0; i < num_dictionaries(); ++i) { - std::unique_ptr message; - RETURN_NOT_OK(ReadMessageFromBlock(GetDictionaryBlock(i), &message)); + ARROW_ASSIGN_OR_RAISE(auto message, ReadMessageFromBlock(GetDictionaryBlock(i))); CHECK_HAS_BODY(*message); ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); + DictionaryKind kind; RETURN_NOT_OK(ReadDictionary(*message->metadata(), &dictionary_memo_, options_, - reader.get())); + &kind, reader.get())); + ++stats_.num_dictionary_batches; + if (kind != DictionaryKind::New) { + return Status::Invalid( + "Unsupported dictionary replacement or " + "dictionary delta in IPC file"); + } } return Status::OK(); } @@ -1026,6 +1093,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { std::shared_ptr schema_; // Schema with deselected fields dropped std::shared_ptr out_schema_; + + ReadStats stats_; }; Result> RecordBatchFileReader::Open( @@ -1141,7 +1210,7 @@ class StreamDecoder::StreamDecoderImpl : public MessageDecoderListener { dictionary_memo_.fields().num_fields(), ") of dictionaries at the start of the stream"); } - RETURN_NOT_OK(ReadDictionary(*message, &dictionary_memo_, options_)); + RETURN_NOT_OK(ReadDictionary(*message)); n_required_dictionaries_--; if (n_required_dictionaries_ == 0) { state_ = State::RECORD_BATCHES; @@ -1152,7 +1221,7 @@ class StreamDecoder::StreamDecoderImpl : public MessageDecoderListener { Status OnRecordBatchMessageDecoded(std::unique_ptr message) { if (message->type() == MessageType::DICTIONARY_BATCH) { - return ReadDictionary(*message, &dictionary_memo_, options_); + return ReadDictionary(*message); } else { CHECK_HAS_BODY(*message); ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); @@ -1164,6 +1233,14 @@ class StreamDecoder::StreamDecoderImpl : public MessageDecoderListener { } } + // Read dictionary from dictionary batch + Status ReadDictionary(const Message& message) { + // TODO accumulate and expose ReadStats + DictionaryKind unused_kind; + return ::arrow::ipc::ReadDictionary(message, &dictionary_memo_, options_, + &unused_kind); + } + std::shared_ptr listener_; IpcReadOptions options_; State state_; diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index d7ff661fb60..d7a6a66692e 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -54,6 +54,23 @@ struct IpcPayload; using RecordBatchReader = ::arrow::RecordBatchReader; +struct ReadStats { + /// Number of IPC messages read. + int64_t num_messages = 0; + /// Number of record batches read. + int64_t num_record_batches = 0; + /// Number of dictionary batches read. + /// + /// Note: num_dictionary_batches >= num_dictionary_deltas + num_replaced_dictionaries + int64_t num_dictionary_batches = 0; + + /// Number of dictionary deltas read. + int64_t num_dictionary_deltas = 0; + /// Number of replaced dictionaries (i.e. where a dictionary batch replaces + /// an existing dictionary with an unrelated new dictionary). + int64_t num_replaced_dictionaries = 0; +}; + /// \class RecordBatchStreamReader /// \brief Synchronous batch stream reader that reads from io::InputStream /// @@ -68,7 +85,7 @@ class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader { /// \param[in] message_reader a MessageReader implementation /// \param[in] options any IPC reading options (optional) /// \return the created batch reader - static Result> Open( + static Result> Open( std::unique_ptr message_reader, const IpcReadOptions& options = IpcReadOptions::Defaults()); @@ -78,7 +95,7 @@ class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader { /// lifetime of stream reader /// \param[in] options any IPC reading options (optional) /// \return the created batch reader - static Result> Open( + static Result> Open( io::InputStream* stream, const IpcReadOptions& options = IpcReadOptions::Defaults()); @@ -86,9 +103,12 @@ class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader { /// \param[in] stream the input stream /// \param[in] options any IPC reading options (optional) /// \return the created batch reader - static Result> Open( + static Result> Open( const std::shared_ptr& stream, const IpcReadOptions& options = IpcReadOptions::Defaults()); + + /// \brief Return current read statistics + virtual ReadStats stats() const = 0; }; /// \brief Reads the record batch file format @@ -159,6 +179,9 @@ class ARROW_EXPORT RecordBatchFileReader { /// \param[in] i the index of the record batch to return /// \return the read batch virtual Result> ReadRecordBatch(int i) = 0; + + /// \brief Return current read statistics + virtual ReadStats stats() const = 0; }; /// \class Listener diff --git a/cpp/src/arrow/ipc/tensor_test.cc b/cpp/src/arrow/ipc/tensor_test.cc new file mode 100644 index 00000000000..7af1492f624 --- /dev/null +++ b/cpp/src/arrow/ipc/tensor_test.cc @@ -0,0 +1,506 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include + +#include + +#include "arrow/io/file.h" +#include "arrow/io/memory.h" +#include "arrow/io/test_common.h" +#include "arrow/ipc/reader.h" +#include "arrow/ipc/test_common.h" +#include "arrow/ipc/writer.h" +#include "arrow/sparse_tensor.h" +#include "arrow/status.h" +#include "arrow/tensor.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" +#include "arrow/type.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/io_util.h" + +namespace arrow { + +using internal::checked_cast; +using internal::GetByteWidth; +using internal::TemporaryDir; + +namespace ipc { +namespace test { + +class BaseTensorTest : public ::testing::Test, public io::MemoryMapFixture { + public: + void SetUp() { ASSERT_OK_AND_ASSIGN(temp_dir_, TemporaryDir::Make("ipc-test-")); } + + void TearDown() { io::MemoryMapFixture::TearDown(); } + + protected: + std::shared_ptr mmap_; + std::unique_ptr temp_dir_; +}; + +class TestTensorRoundTrip : public BaseTensorTest { + public: + void CheckTensorRoundTrip(const Tensor& tensor) { + int32_t metadata_length; + int64_t body_length; + const int elem_size = GetByteWidth(*tensor.type()); + + ASSERT_OK(mmap_->Seek(0)); + + ASSERT_OK(WriteTensor(tensor, mmap_.get(), &metadata_length, &body_length)); + + const int64_t expected_body_length = elem_size * tensor.size(); + ASSERT_EQ(expected_body_length, body_length); + + ASSERT_OK(mmap_->Seek(0)); + + std::shared_ptr result; + ASSERT_OK_AND_ASSIGN(result, ReadTensor(mmap_.get())); + + ASSERT_EQ(result->data()->size(), expected_body_length); + ASSERT_TRUE(tensor.Equals(*result)); + } + + protected: + std::shared_ptr mmap_; + std::unique_ptr temp_dir_; +}; + +TEST_F(TestTensorRoundTrip, BasicRoundtrip) { + std::string path = "test-write-tensor"; + constexpr int64_t kBufferSize = 1 << 20; + ASSERT_OK_AND_ASSIGN(mmap_, io::MemoryMapFixture::InitMemoryMap(kBufferSize, path)); + + std::vector shape = {4, 6}; + std::vector strides = {48, 8}; + std::vector dim_names = {"foo", "bar"}; + int64_t size = 24; + + std::vector values; + randint(size, 0, 100, &values); + + auto data = Buffer::Wrap(values); + + Tensor t0(int64(), data, shape, strides, dim_names); + Tensor t_no_dims(int64(), data, {}, {}, {}); + Tensor t_zero_length_dim(int64(), data, {0}, {8}, {"foo"}); + + CheckTensorRoundTrip(t0); + CheckTensorRoundTrip(t_no_dims); + CheckTensorRoundTrip(t_zero_length_dim); + + int64_t serialized_size; + ASSERT_OK(GetTensorSize(t0, &serialized_size)); + ASSERT_TRUE(serialized_size > static_cast(size * sizeof(int64_t))); + + // ARROW-2840: Check that padding/alignment minded + std::vector shape_2 = {1, 1}; + std::vector strides_2 = {8, 8}; + Tensor t0_not_multiple_64(int64(), data, shape_2, strides_2, dim_names); + CheckTensorRoundTrip(t0_not_multiple_64); +} + +TEST_F(TestTensorRoundTrip, NonContiguous) { + std::string path = "test-write-tensor-strided"; + constexpr int64_t kBufferSize = 1 << 20; + ASSERT_OK_AND_ASSIGN(mmap_, io::MemoryMapFixture::InitMemoryMap(kBufferSize, path)); + + std::vector values; + randint(24, 0, 100, &values); + + auto data = Buffer::Wrap(values); + Tensor tensor(int64(), data, {4, 3}, {48, 16}); + + CheckTensorRoundTrip(tensor); +} + +template +class TestSparseTensorRoundTrip : public BaseTensorTest { + public: + void CheckSparseCOOTensorRoundTrip(const SparseCOOTensor& sparse_tensor) { + const int elem_size = GetByteWidth(*sparse_tensor.type()); + const int index_elem_size = sizeof(typename IndexValueType::c_type); + + int32_t metadata_length; + int64_t body_length; + + ASSERT_OK(mmap_->Seek(0)); + + ASSERT_OK( + WriteSparseTensor(sparse_tensor, mmap_.get(), &metadata_length, &body_length)); + + const auto& sparse_index = + checked_cast(*sparse_tensor.sparse_index()); + const int64_t indices_length = + BitUtil::RoundUpToMultipleOf8(index_elem_size * sparse_index.indices()->size()); + const int64_t data_length = + BitUtil::RoundUpToMultipleOf8(elem_size * sparse_tensor.non_zero_length()); + const int64_t expected_body_length = indices_length + data_length; + ASSERT_EQ(expected_body_length, body_length); + + ASSERT_OK(mmap_->Seek(0)); + + std::shared_ptr result; + ASSERT_OK_AND_ASSIGN(result, ReadSparseTensor(mmap_.get())); + ASSERT_EQ(SparseTensorFormat::COO, result->format_id()); + + const auto& resulted_sparse_index = + checked_cast(*result->sparse_index()); + ASSERT_EQ(resulted_sparse_index.indices()->data()->size(), indices_length); + ASSERT_EQ(resulted_sparse_index.is_canonical(), sparse_index.is_canonical()); + ASSERT_EQ(result->data()->size(), data_length); + ASSERT_TRUE(result->Equals(sparse_tensor)); + } + + template + void CheckSparseCSXMatrixRoundTrip( + const SparseTensorImpl& sparse_tensor) { + static_assert(std::is_same::value || + std::is_same::value, + "SparseIndexType must be either SparseCSRIndex or SparseCSCIndex"); + + const int elem_size = GetByteWidth(*sparse_tensor.type()); + const int index_elem_size = sizeof(typename IndexValueType::c_type); + + int32_t metadata_length; + int64_t body_length; + + ASSERT_OK(mmap_->Seek(0)); + + ASSERT_OK( + WriteSparseTensor(sparse_tensor, mmap_.get(), &metadata_length, &body_length)); + + const auto& sparse_index = + checked_cast(*sparse_tensor.sparse_index()); + const int64_t indptr_length = + BitUtil::RoundUpToMultipleOf8(index_elem_size * sparse_index.indptr()->size()); + const int64_t indices_length = + BitUtil::RoundUpToMultipleOf8(index_elem_size * sparse_index.indices()->size()); + const int64_t data_length = + BitUtil::RoundUpToMultipleOf8(elem_size * sparse_tensor.non_zero_length()); + const int64_t expected_body_length = indptr_length + indices_length + data_length; + ASSERT_EQ(expected_body_length, body_length); + + ASSERT_OK(mmap_->Seek(0)); + + std::shared_ptr result; + ASSERT_OK_AND_ASSIGN(result, ReadSparseTensor(mmap_.get())); + + constexpr auto expected_format_id = + std::is_same::value ? SparseTensorFormat::CSR + : SparseTensorFormat::CSC; + ASSERT_EQ(expected_format_id, result->format_id()); + + const auto& resulted_sparse_index = + checked_cast(*result->sparse_index()); + ASSERT_EQ(resulted_sparse_index.indptr()->data()->size(), indptr_length); + ASSERT_EQ(resulted_sparse_index.indices()->data()->size(), indices_length); + ASSERT_EQ(result->data()->size(), data_length); + ASSERT_TRUE(result->Equals(sparse_tensor)); + } + + void CheckSparseCSFTensorRoundTrip(const SparseCSFTensor& sparse_tensor) { + const int elem_size = GetByteWidth(*sparse_tensor.type()); + const int index_elem_size = sizeof(typename IndexValueType::c_type); + + int32_t metadata_length; + int64_t body_length; + + ASSERT_OK(mmap_->Seek(0)); + + ASSERT_OK( + WriteSparseTensor(sparse_tensor, mmap_.get(), &metadata_length, &body_length)); + + const auto& sparse_index = + checked_cast(*sparse_tensor.sparse_index()); + + const int64_t ndim = sparse_index.axis_order().size(); + int64_t indptr_length = 0; + int64_t indices_length = 0; + + for (int64_t i = 0; i < ndim - 1; ++i) { + indptr_length += BitUtil::RoundUpToMultipleOf8(index_elem_size * + sparse_index.indptr()[i]->size()); + } + for (int64_t i = 0; i < ndim; ++i) { + indices_length += BitUtil::RoundUpToMultipleOf8(index_elem_size * + sparse_index.indices()[i]->size()); + } + const int64_t data_length = + BitUtil::RoundUpToMultipleOf8(elem_size * sparse_tensor.non_zero_length()); + const int64_t expected_body_length = indptr_length + indices_length + data_length; + ASSERT_EQ(expected_body_length, body_length); + + ASSERT_OK(mmap_->Seek(0)); + + std::shared_ptr result; + ASSERT_OK_AND_ASSIGN(result, ReadSparseTensor(mmap_.get())); + ASSERT_EQ(SparseTensorFormat::CSF, result->format_id()); + + const auto& resulted_sparse_index = + checked_cast(*result->sparse_index()); + + int64_t out_indptr_length = 0; + int64_t out_indices_length = 0; + for (int i = 0; i < ndim - 1; ++i) { + out_indptr_length += BitUtil::RoundUpToMultipleOf8( + index_elem_size * resulted_sparse_index.indptr()[i]->size()); + } + for (int i = 0; i < ndim; ++i) { + out_indices_length += BitUtil::RoundUpToMultipleOf8( + index_elem_size * resulted_sparse_index.indices()[i]->size()); + } + + ASSERT_EQ(out_indptr_length, indptr_length); + ASSERT_EQ(out_indices_length, indices_length); + ASSERT_EQ(result->data()->size(), data_length); + ASSERT_TRUE(resulted_sparse_index.Equals(sparse_index)); + ASSERT_TRUE(result->Equals(sparse_tensor)); + } + + protected: + std::shared_ptr MakeSparseCOOIndex( + const std::vector& coords_shape, + const std::vector& coords_strides, + std::vector& coords_values) const { + auto coords_data = Buffer::Wrap(coords_values); + auto coords = std::make_shared>( + coords_data, coords_shape, coords_strides); + return std::make_shared(coords); + } + + template + Result> MakeSparseCOOTensor( + const std::shared_ptr& si, std::vector& sparse_values, + const std::vector& shape, + const std::vector& dim_names = {}) const { + auto data = Buffer::Wrap(sparse_values); + return SparseCOOTensor::Make(si, CTypeTraits::type_singleton(), data, + shape, dim_names); + } +}; + +TYPED_TEST_SUITE_P(TestSparseTensorRoundTrip); + +TYPED_TEST_P(TestSparseTensorRoundTrip, WithSparseCOOIndexRowMajor) { + using IndexValueType = TypeParam; + using c_index_value_type = typename IndexValueType::c_type; + + std::string path = "test-write-sparse-coo-tensor"; + constexpr int64_t kBufferSize = 1 << 20; + ASSERT_OK_AND_ASSIGN(this->mmap_, + io::MemoryMapFixture::InitMemoryMap(kBufferSize, path)); + + // Dense representation: + // [ + // [ + // 1 0 2 0 + // 0 3 0 4 + // 5 0 6 0 + // ], + // [ + // 0 11 0 12 + // 13 0 14 0 + // 0 15 0 16 + // ] + // ] + // + // Sparse representation: + // idx[0] = [0 0 0 0 0 0 1 1 1 1 1 1] + // idx[1] = [0 0 1 1 2 2 0 0 1 1 2 2] + // idx[2] = [0 2 1 3 0 2 1 3 0 2 1 3] + // data = [1 2 3 4 5 6 11 12 13 14 15 16] + + // canonical + std::vector coords_values = {0, 0, 0, 0, 0, 2, 0, 1, 1, 0, 1, 3, + 0, 2, 0, 0, 2, 2, 1, 0, 1, 1, 0, 3, + 1, 1, 0, 1, 1, 2, 1, 2, 1, 1, 2, 3}; + const int sizeof_index_value = sizeof(c_index_value_type); + std::shared_ptr si; + ASSERT_OK_AND_ASSIGN( + si, SparseCOOIndex::Make(TypeTraits::type_singleton(), {12, 3}, + {sizeof_index_value * 3, sizeof_index_value}, + Buffer::Wrap(coords_values))); + ASSERT_TRUE(si->is_canonical()); + + std::vector shape = {2, 3, 4}; + std::vector dim_names = {"foo", "bar", "baz"}; + std::vector values = {1, 2, 3, 4, 5, 6, 11, 12, 13, 14, 15, 16}; + std::shared_ptr st; + ASSERT_OK_AND_ASSIGN(st, this->MakeSparseCOOTensor(si, values, shape, dim_names)); + + this->CheckSparseCOOTensorRoundTrip(*st); + + // non-canonical + ASSERT_OK_AND_ASSIGN( + si, SparseCOOIndex::Make(TypeTraits::type_singleton(), {12, 3}, + {sizeof_index_value * 3, sizeof_index_value}, + Buffer::Wrap(coords_values), false)); + ASSERT_FALSE(si->is_canonical()); + ASSERT_OK_AND_ASSIGN(st, this->MakeSparseCOOTensor(si, values, shape, dim_names)); + + this->CheckSparseCOOTensorRoundTrip(*st); +} + +TYPED_TEST_P(TestSparseTensorRoundTrip, WithSparseCOOIndexColumnMajor) { + using IndexValueType = TypeParam; + using c_index_value_type = typename IndexValueType::c_type; + + std::string path = "test-write-sparse-coo-tensor"; + constexpr int64_t kBufferSize = 1 << 20; + ASSERT_OK_AND_ASSIGN(this->mmap_, + io::MemoryMapFixture::InitMemoryMap(kBufferSize, path)); + + // Dense representation: + // [ + // [ + // 1 0 2 0 + // 0 3 0 4 + // 5 0 6 0 + // ], + // [ + // 0 11 0 12 + // 13 0 14 0 + // 0 15 0 16 + // ] + // ] + // + // Sparse representation: + // idx[0] = [0 0 0 0 0 0 1 1 1 1 1 1] + // idx[1] = [0 0 1 1 2 2 0 0 1 1 2 2] + // idx[2] = [0 2 1 3 0 2 1 3 0 2 1 3] + // data = [1 2 3 4 5 6 11 12 13 14 15 16] + + // canonical + std::vector coords_values = {0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, + 0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2, + 0, 2, 1, 3, 0, 2, 1, 3, 0, 2, 1, 3}; + const int sizeof_index_value = sizeof(c_index_value_type); + std::shared_ptr si; + ASSERT_OK_AND_ASSIGN( + si, SparseCOOIndex::Make(TypeTraits::type_singleton(), {12, 3}, + {sizeof_index_value, sizeof_index_value * 12}, + Buffer::Wrap(coords_values))); + ASSERT_TRUE(si->is_canonical()); + + std::vector shape = {2, 3, 4}; + std::vector dim_names = {"foo", "bar", "baz"}; + std::vector values = {1, 2, 3, 4, 5, 6, 11, 12, 13, 14, 15, 16}; + + std::shared_ptr st; + ASSERT_OK_AND_ASSIGN(st, this->MakeSparseCOOTensor(si, values, shape, dim_names)); + + this->CheckSparseCOOTensorRoundTrip(*st); + + // non-canonical + ASSERT_OK_AND_ASSIGN( + si, SparseCOOIndex::Make(TypeTraits::type_singleton(), {12, 3}, + {sizeof_index_value, sizeof_index_value * 12}, + Buffer::Wrap(coords_values), false)); + ASSERT_FALSE(si->is_canonical()); + ASSERT_OK_AND_ASSIGN(st, this->MakeSparseCOOTensor(si, values, shape, dim_names)); + + this->CheckSparseCOOTensorRoundTrip(*st); +} + +TYPED_TEST_P(TestSparseTensorRoundTrip, WithSparseCSRIndex) { + using IndexValueType = TypeParam; + + std::string path = "test-write-sparse-csr-matrix"; + constexpr int64_t kBufferSize = 1 << 20; + ASSERT_OK_AND_ASSIGN(this->mmap_, + io::MemoryMapFixture::InitMemoryMap(kBufferSize, path)); + + std::vector shape = {4, 6}; + std::vector dim_names = {"foo", "bar", "baz"}; + std::vector values = {1, 0, 2, 0, 0, 3, 0, 4, 5, 0, 6, 0, + 0, 11, 0, 12, 13, 0, 14, 0, 0, 15, 0, 16}; + + auto data = Buffer::Wrap(values); + NumericTensor t(data, shape, {}, dim_names); + std::shared_ptr st; + ASSERT_OK_AND_ASSIGN( + st, SparseCSRMatrix::Make(t, TypeTraits::type_singleton())); + + this->CheckSparseCSXMatrixRoundTrip(*st); +} + +TYPED_TEST_P(TestSparseTensorRoundTrip, WithSparseCSCIndex) { + using IndexValueType = TypeParam; + + std::string path = "test-write-sparse-csc-matrix"; + constexpr int64_t kBufferSize = 1 << 20; + ASSERT_OK_AND_ASSIGN(this->mmap_, + io::MemoryMapFixture::InitMemoryMap(kBufferSize, path)); + + std::vector shape = {4, 6}; + std::vector dim_names = {"foo", "bar", "baz"}; + std::vector values = {1, 0, 2, 0, 0, 3, 0, 4, 5, 0, 6, 0, + 0, 11, 0, 12, 13, 0, 14, 0, 0, 15, 0, 16}; + + auto data = Buffer::Wrap(values); + NumericTensor t(data, shape, {}, dim_names); + std::shared_ptr st; + ASSERT_OK_AND_ASSIGN( + st, SparseCSCMatrix::Make(t, TypeTraits::type_singleton())); + + this->CheckSparseCSXMatrixRoundTrip(*st); +} + +TYPED_TEST_P(TestSparseTensorRoundTrip, WithSparseCSFIndex) { + using IndexValueType = TypeParam; + + std::string path = "test-write-sparse-csf-tensor"; + constexpr int64_t kBufferSize = 1 << 20; + ASSERT_OK_AND_ASSIGN(this->mmap_, + io::MemoryMapFixture::InitMemoryMap(kBufferSize, path)); + + std::vector shape = {4, 6}; + std::vector dim_names = {"foo", "bar", "baz"}; + std::vector values = {1, 0, 2, 0, 0, 3, 0, 4, 5, 0, 6, 0, + 0, 11, 0, 12, 13, 0, 14, 0, 0, 15, 0, 16}; + + auto data = Buffer::Wrap(values); + NumericTensor t(data, shape, {}, dim_names); + std::shared_ptr st; + ASSERT_OK_AND_ASSIGN( + st, SparseCSFTensor::Make(t, TypeTraits::type_singleton())); + + this->CheckSparseCSFTensorRoundTrip(*st); +} +REGISTER_TYPED_TEST_SUITE_P(TestSparseTensorRoundTrip, WithSparseCOOIndexRowMajor, + WithSparseCOOIndexColumnMajor, WithSparseCSRIndex, + WithSparseCSCIndex, WithSparseCSFIndex); + +INSTANTIATE_TYPED_TEST_SUITE_P(TestInt8, TestSparseTensorRoundTrip, Int8Type); +INSTANTIATE_TYPED_TEST_SUITE_P(TestUInt8, TestSparseTensorRoundTrip, UInt8Type); +INSTANTIATE_TYPED_TEST_SUITE_P(TestInt16, TestSparseTensorRoundTrip, Int16Type); +INSTANTIATE_TYPED_TEST_SUITE_P(TestUInt16, TestSparseTensorRoundTrip, UInt16Type); +INSTANTIATE_TYPED_TEST_SUITE_P(TestInt32, TestSparseTensorRoundTrip, Int32Type); +INSTANTIATE_TYPED_TEST_SUITE_P(TestUInt32, TestSparseTensorRoundTrip, UInt32Type); +INSTANTIATE_TYPED_TEST_SUITE_P(TestInt64, TestSparseTensorRoundTrip, Int64Type); + +} // namespace test +} // namespace ipc +} // namespace arrow diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 14aa47b7b26..9f9be32e246 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -66,7 +67,7 @@ namespace ipc { using internal::FileBlock; using internal::kArrowMagicBytes; -namespace internal { +namespace { Status GetTruncatedBitmap(int64_t offset, int64_t length, const std::shared_ptr input, MemoryPool* pool, @@ -523,8 +524,8 @@ class RecordBatchSerializer { std::shared_ptr custom_metadata_; - std::vector field_nodes_; - std::vector buffer_meta_; + std::vector field_nodes_; + std::vector buffer_meta_; const IpcWriteOptions& options_; int64_t max_recursion_depth_; @@ -557,7 +558,7 @@ class DictionarySerializer : public RecordBatchSerializer { bool is_delta_; }; -} // namespace internal +} // namespace Status WriteIpcPayload(const IpcPayload& payload, const IpcWriteOptions& options, io::OutputStream* dst, int32_t* metadata_length) { @@ -611,15 +612,14 @@ Status GetDictionaryPayload(int64_t id, bool is_delta, const IpcWriteOptions& options, IpcPayload* out) { out->type = MessageType::DICTIONARY_BATCH; // Frame of reference is 0, see ARROW-384 - internal::DictionarySerializer assembler(id, is_delta, /*buffer_start_offset=*/0, - options, out); + DictionarySerializer assembler(id, is_delta, /*buffer_start_offset=*/0, options, out); return assembler.Assemble(dictionary); } Status GetRecordBatchPayload(const RecordBatch& batch, const IpcWriteOptions& options, IpcPayload* out) { out->type = MessageType::RECORD_BATCH; - internal::RecordBatchSerializer assembler(/*buffer_start_offset=*/0, options, out); + RecordBatchSerializer assembler(/*buffer_start_offset=*/0, options, out); return assembler.Assemble(batch); } @@ -627,7 +627,7 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, const IpcWriteOptions& options) { IpcPayload payload; - internal::RecordBatchSerializer assembler(buffer_start_offset, options, &payload); + RecordBatchSerializer assembler(buffer_start_offset, options, &payload); RETURN_NOT_OK(assembler.Assemble(batch)); // TODO: it's a rough edge that the metadata and body length here are @@ -962,16 +962,19 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter { public: // A RecordBatchWriter implementation that writes to a IpcPayloadWriter. IpcFormatWriter(std::unique_ptr payload_writer, - const Schema& schema, const IpcWriteOptions& options) + const Schema& schema, const IpcWriteOptions& options, + bool is_file_format) : payload_writer_(std::move(payload_writer)), schema_(schema), mapper_(schema), + is_file_format_(is_file_format), options_(options) {} // A Schema-owning constructor variant IpcFormatWriter(std::unique_ptr payload_writer, - const std::shared_ptr& schema, const IpcWriteOptions& options) - : IpcFormatWriter(std::move(payload_writer), *schema, options) { + const std::shared_ptr& schema, const IpcWriteOptions& options, + bool is_file_format) + : IpcFormatWriter(std::move(payload_writer), *schema, options, is_file_format) { shared_schema_ = schema; } @@ -982,13 +985,7 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter { RETURN_NOT_OK(CheckStarted()); - if (!wrote_dictionaries_) { - RETURN_NOT_OK(WriteDictionaries(batch)); - wrote_dictionaries_ = true; - } - - // TODO: Check for delta dictionaries. Can we scan for deltas while computing - // the RecordBatch payload to save time? + RETURN_NOT_OK(WriteDictionaries(batch)); IpcPayload payload; RETURN_NOT_OK(GetRecordBatchPayload(batch, options_, &payload)); @@ -1025,8 +1022,36 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter { int64_t dictionary_id = pair.first; const auto& dictionary = pair.second; + // If a dictionary with this id was already emitted, check if it was the same. + auto* last_dictionary = &last_dictionaries_[dictionary_id]; + const bool dictionary_exists = (*last_dictionary != nullptr); + if (dictionary_exists) { + if ((*last_dictionary)->data() == dictionary->data()) { + // Fast shortcut for a common case. + // Same dictionary data by pointer => no need to emit it again + continue; + } + if ((*last_dictionary)->Equals(dictionary, EqualOptions().nans_equal(true))) { + // Same dictionary by value => no need to emit it again + // (while this can have a CPU cost, this code path is required + // for the IPC file format) + continue; + } + // TODO check for possible delta? + } + + if (is_file_format_ && dictionary_exists) { + return Status::Invalid( + "Dictionary replacement detected when writing IPC file format. " + "Arrow IPC files only support a single dictionary for a given field " + "accross all batches."); + } + RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, dictionary, options_, &payload)); RETURN_NOT_OK(payload_writer_->WritePayload(payload)); + + // Remember dictionary for next batches + *last_dictionary = dictionary; } return Status::OK(); } @@ -1035,8 +1060,16 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter { std::shared_ptr shared_schema_; const Schema& schema_; const DictionaryFieldMapper mapper_; + const bool is_file_format_; + + // A map of last-written dictionaries by id. + // This is required to avoid the same dictionary again and again, + // and also for correctness when writing the IPC file format + // (where replacements and deltas are unsupported). + // The latter is also why we can't use weak_ptr. + std::unordered_map> last_dictionaries_; + bool started_ = false; - bool wrote_dictionaries_ = false; IpcWriteOptions options_; }; @@ -1214,7 +1247,7 @@ Result> MakeStreamWriter( const IpcWriteOptions& options) { return std::make_shared( ::arrow::internal::make_unique(sink, options), - schema, options); + schema, options, /*is_file_format=*/false); } Result> MakeStreamWriter( @@ -1223,7 +1256,7 @@ Result> MakeStreamWriter( return std::make_shared( ::arrow::internal::make_unique(std::move(sink), options), - schema, options); + schema, options, /*is_file_format=*/false); } Result> NewStreamWriter( @@ -1239,7 +1272,7 @@ Result> MakeFileWriter( return std::make_shared( ::arrow::internal::make_unique(options, schema, metadata, sink), - schema, options); + schema, options, /*is_file_format=*/true); } Result> MakeFileWriter( @@ -1249,7 +1282,7 @@ Result> MakeFileWriter( return std::make_shared( ::arrow::internal::make_unique( options, schema, metadata, std::move(sink)), - schema, options); + schema, options, /*is_file_format=*/true); } Result> NewFileWriter( @@ -1265,8 +1298,8 @@ Result> OpenRecordBatchWriter( std::unique_ptr sink, const std::shared_ptr& schema, const IpcWriteOptions& options) { // XXX should we call Start()? - return ::arrow::internal::make_unique(std::move(sink), - schema, options); + return ::arrow::internal::make_unique( + std::move(sink), schema, options, /*is_file_format=*/false); } Result> MakePayloadStreamWriter( @@ -1328,9 +1361,10 @@ Result> SerializeSchema(const Schema& schema, MemoryPool ARROW_ASSIGN_OR_RAISE(auto stream, io::BufferOutputStream::Create(1024, pool)); auto options = IpcWriteOptions::Defaults(); + const bool is_file_format = false; // indifferent as we don't write dictionaries internal::IpcFormatWriter writer( ::arrow::internal::make_unique(stream.get()), schema, - options); + options, is_file_format); RETURN_NOT_OK(writer.Start()); return stream->Finish(); } diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index 0d2edf61350..48f3a9a3b58 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -414,6 +414,9 @@ Result> MakePayloadFileWriter( /// Create a new RecordBatchWriter from IpcPayloadWriter and schema. /// +/// The format is implicitly the IPC stream format (allowing dictionary +/// replacement and deltas). +/// /// \param[in] sink the IpcPayloadWriter to write to /// \param[in] schema the schema of the record batches to be written /// \param[in] options options for serialization diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc index 4b3fd8682c2..f88230ffd64 100644 --- a/cpp/src/arrow/pretty_print.cc +++ b/cpp/src/arrow/pretty_print.cc @@ -552,7 +552,7 @@ Status PrettyPrint(const Table& table, const PrettyPrintOptions& options, } Status DebugPrint(const Array& arr, int indent) { - return PrettyPrint(arr, indent, &std::cout); + return PrettyPrint(arr, indent, &std::cerr); } class SchemaPrinter : public PrettyPrinter {