From 615f4b592ae45b0fab6d69176648ef35655cb693 Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Thu, 25 Jun 2020 22:13:33 +0800 Subject: [PATCH 1/3] ARROW-7285: [C++] ensure C++ implementation meets clarified dictionary spec --- cpp/src/arrow/ipc/dictionary.cc | 9 ++ cpp/src/arrow/ipc/dictionary.h | 4 + cpp/src/arrow/ipc/metadata_internal.cc | 5 +- cpp/src/arrow/ipc/metadata_internal.h | 2 +- cpp/src/arrow/ipc/read_write_test.cc | 146 +++++++++++++++++++++++++ cpp/src/arrow/ipc/reader.cc | 40 ++++--- cpp/src/arrow/ipc/writer.cc | 27 ++++- cpp/src/arrow/ipc/writer.h | 27 ++++- 8 files changed, 238 insertions(+), 22 deletions(-) diff --git a/cpp/src/arrow/ipc/dictionary.cc b/cpp/src/arrow/ipc/dictionary.cc index 4d4f60575b6..e774193b6a1 100644 --- a/cpp/src/arrow/ipc/dictionary.cc +++ b/cpp/src/arrow/ipc/dictionary.cc @@ -142,6 +142,15 @@ Status DictionaryMemo::AddDictionary(int64_t id, return Status::OK(); } +Status DictionaryMemo::UpdateDictionary(int64_t id, + const std::shared_ptr& dictionary) { + if (!HasDictionary(id)) { + return Status::KeyError("Dictionary with id ", id, " does not exists"); + } + id_to_dictionary_[id] = dictionary; + return Status::OK(); +} + DictionaryMemo::DictionaryVector DictionaryMemo::dictionaries() const { // Sort dictionaries by ascending id. This ensures that, in the case // of nested dictionaries, inner dictionaries are emitted before outer diff --git a/cpp/src/arrow/ipc/dictionary.h b/cpp/src/arrow/ipc/dictionary.h index dc2c716559b..f70db81e7a2 100644 --- a/cpp/src/arrow/ipc/dictionary.h +++ b/cpp/src/arrow/ipc/dictionary.h @@ -78,6 +78,10 @@ class ARROW_EXPORT DictionaryMemo { /// KeyError if that dictionary already exists Status AddDictionary(int64_t id, const std::shared_ptr& dictionary); + /// \brief Update a dictionary to the memo with a particular id. Returns + /// KeyError if that dictionary does not exists + Status UpdateDictionary(int64_t id, const std::shared_ptr& dictionary); + /// \brief The stored dictionaries, in ascending id order. DictionaryVector dictionaries() const; diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc index 3c110dd50cc..4809ade1ab7 100644 --- a/cpp/src/arrow/ipc/metadata_internal.cc +++ b/cpp/src/arrow/ipc/metadata_internal.cc @@ -1212,12 +1212,13 @@ Status WriteDictionaryMessage( int64_t id, int64_t length, int64_t body_length, const std::shared_ptr& custom_metadata, const std::vector& nodes, const std::vector& buffers, - const IpcWriteOptions& options, std::shared_ptr* out) { + const IpcWriteOptions& options, std::shared_ptr* out, bool isDelta) { FBB fbb; RecordBatchOffset record_batch; RETURN_NOT_OK( MakeRecordBatch(fbb, length, body_length, nodes, buffers, options, &record_batch)); - auto dictionary_batch = flatbuf::CreateDictionaryBatch(fbb, id, record_batch).Union(); + auto dictionary_batch = + flatbuf::CreateDictionaryBatch(fbb, id, record_batch, isDelta).Union(); return WriteFBMessage(fbb, flatbuf::MessageHeader::DictionaryBatch, dictionary_batch, body_length, custom_metadata) .Value(out); diff --git a/cpp/src/arrow/ipc/metadata_internal.h b/cpp/src/arrow/ipc/metadata_internal.h index b0da188363f..fb179a4d5c8 100644 --- a/cpp/src/arrow/ipc/metadata_internal.h +++ b/cpp/src/arrow/ipc/metadata_internal.h @@ -198,7 +198,7 @@ Status WriteDictionaryMessage( const int64_t id, const int64_t length, const int64_t body_length, const std::shared_ptr& custom_metadata, const std::vector& nodes, const std::vector& buffers, - const IpcWriteOptions& options, std::shared_ptr* out); + const IpcWriteOptions& options, std::shared_ptr* out, bool isDelta); static inline Result> WriteFlatbufferBuilder( flatbuffers::FlatBufferBuilder& fbb) { diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 2c1bf1c73a2..1cd4c95868d 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -1228,6 +1228,152 @@ TEST_P(TestFileFormat, RoundTrip) { TestZeroLengthRoundTrip(*GetParam(), options); } +Status MakeDictionaryBatch(std::shared_ptr* out) { + const int64_t length = 6; + + std::vector is_valid = {true, true, false, true, true, true}; + + auto dict_ty = utf8(); + + auto dict = ArrayFromJSON(dict_ty, "[\"foo\", \"bar\", \"baz\"]"); + + auto f0_type = arrow::dictionary(arrow::int32(), dict_ty); + auto f1_type = arrow::dictionary(arrow::int8(), dict_ty); + + std::shared_ptr indices0, indices1; + std::vector indices0_values = {1, 2, -1, 0, 2, 0}; + std::vector indices1_values = {0, 0, 2, 2, 1, 1}; + + ArrayFromVector(is_valid, indices0_values, &indices0); + ArrayFromVector(is_valid, indices1_values, &indices1); + + auto a0 = std::make_shared(f0_type, indices0, dict); + auto a1 = std::make_shared(f1_type, indices1, dict); + + // construct batch + auto schema = ::arrow::schema({field("dict1", f0_type), field("dict2", f1_type)}); + + *out = RecordBatch::Make(schema, length, {a0, a1}); + return Status::OK(); +} + +// A record batch writer implementation that supports manually specifying dictionaries. +class TestRecordBatchWriter : public RecordBatchWriter { + public: + explicit TestRecordBatchWriter(const Schema& schema) : schema_(schema) { + buffer_ = *AllocateResizableBuffer(0); + sink_.reset(new io::BufferOutputStream(buffer_)); + payload_writer_ = *internal::NewPayloadStreamWriter(sink_.get()); + } + + Status WriteRecordBatch(const RecordBatch& batch) override { + // dumb implementation + return Status::OK(); + } + + Status Start() { + RETURN_NOT_OK(payload_writer_->Start()); + + // write schema + IpcPayload payload; + RETURN_NOT_OK(GetSchemaPayload(schema_, IpcWriteOptions::Defaults(), + &dictionary_memo_, &payload)); + return payload_writer_->WritePayload(payload); + } + + Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr& dictionary, + bool isDelta) { + IpcPayload payload; + RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, dictionary, + IpcWriteOptions::Defaults(), &payload, isDelta)); + RETURN_NOT_OK(payload_writer_->WritePayload(payload)); + return Status::OK(); + } + + Status WriteBatchPayload(const RecordBatch& batch) { + // write record batch payload only + IpcPayload payload; + RETURN_NOT_OK(GetRecordBatchPayload(batch, IpcWriteOptions::Defaults(), &payload)); + return payload_writer_->WritePayload(payload); + } + + Status Close() override { + RETURN_NOT_OK(payload_writer_->Close()); + return sink_->Close(); + } + + Status ReadBatch(std::shared_ptr* out_batch) { + auto buf_reader = std::make_shared(buffer_); + std::shared_ptr reader; + ARROW_ASSIGN_OR_RAISE( + reader, RecordBatchStreamReader::Open(buf_reader, IpcReadOptions::Defaults())) + return reader->ReadNext(out_batch); + } + + std::unique_ptr payload_writer_; + const Schema& schema_; + DictionaryMemo dictionary_memo_; + std::shared_ptr buffer_; + std::unique_ptr sink_; +}; + +TEST(TestDictionaryBatch, CombineDictionary) { + std::shared_ptr in_batch; + std::shared_ptr out_batch; + ASSERT_OK(MakeDictionaryBatch(&in_batch)); + + auto dict_ty = utf8(); + auto dict1 = ArrayFromJSON(dict_ty, "[\"foo\", \"bar\"]"); + auto dict2 = ArrayFromJSON(dict_ty, "[\"baz\"]"); + + TestRecordBatchWriter writer(*in_batch->schema()); + ASSERT_OK(writer.Start()); + + // combine dictionaries to make up the + // original dictionaries. + ASSERT_OK(writer.WriteDictionary(0L, dict1, false)); + ASSERT_OK(writer.WriteDictionary(0L, dict2, true)); + + ASSERT_OK(writer.WriteDictionary(1L, dict1, false)); + ASSERT_OK(writer.WriteDictionary(1L, dict2, true)); + + ASSERT_OK(writer.WriteBatchPayload(*in_batch)); + ASSERT_OK(writer.Close()); + + ASSERT_OK(writer.ReadBatch(&out_batch)); + + ASSERT_BATCHES_EQUAL(*in_batch, *out_batch); +} + +TEST(TestDictionaryBatch, ReplaceDictionary) { + std::shared_ptr in_batch; + std::shared_ptr out_batch; + ASSERT_OK(MakeDictionaryBatch(&in_batch)); + + auto dict_ty = utf8(); + auto dict = ArrayFromJSON(dict_ty, "[\"foo\", \"bar\", \"baz\"]"); + auto dict1 = ArrayFromJSON(dict_ty, "[\"foo1\", \"bar1\", \"baz1\"]"); + auto dict2 = ArrayFromJSON(dict_ty, "[\"foo2\", \"bar2\", \"baz2\"]"); + + TestRecordBatchWriter writer(*in_batch->schema()); + ASSERT_OK(writer.Start()); + + // the old dictionaries will be overwritten by + // the new dictionaries with the same ids. + ASSERT_OK(writer.WriteDictionary(0L, dict1, false)); + ASSERT_OK(writer.WriteDictionary(0L, dict, false)); + + ASSERT_OK(writer.WriteDictionary(1L, dict2, false)); + ASSERT_OK(writer.WriteDictionary(1L, dict, false)); + + ASSERT_OK(writer.WriteBatchPayload(*in_batch)); + ASSERT_OK(writer.Close()); + + ASSERT_OK(writer.ReadBatch(&out_batch)); + + ASSERT_BATCHES_EQUAL(*in_batch, *out_batch); +} + TEST_P(TestStreamFormat, RoundTrip) { TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults()); TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults()); diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index d35236cc238..6faab72d819 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -29,6 +29,7 @@ #include // IWYU pragma: export #include "arrow/array.h" +#include "arrow/array/concatenate.h" #include "arrow/buffer.h" #include "arrow/extension_type.h" #include "arrow/io/interfaces.h" @@ -684,7 +685,19 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo, return Status::Invalid("Dictionary record batch must only contain one field"); } auto dictionary = batch->column(0); - return dictionary_memo->AddDictionary(id, dictionary); + if (dictionary_batch->isDelta()) { + std::shared_ptr originalDict, combinedDict; + RETURN_NOT_OK(dictionary_memo->GetDictionary(id, &originalDict)); + ArrayVector dictsToCombine{originalDict, dictionary}; + ARROW_ASSIGN_OR_RAISE(combinedDict, Concatenate(dictsToCombine, options.memory_pool)); + return dictionary_memo->UpdateDictionary(id, combinedDict); + } + + if (dictionary_memo->HasDictionary(id)) { + return dictionary_memo->UpdateDictionary(id, dictionary); + } else { + return dictionary_memo->AddDictionary(id, dictionary); + } } Status ParseDictionary(const Message& message, DictionaryMemo* dictionary_memo, @@ -698,8 +711,7 @@ Status ParseDictionary(const Message& message, DictionaryMemo* dictionary_memo, Status UpdateDictionaries(const Message& message, DictionaryMemo* dictionary_memo, const IpcReadOptions& options) { - // TODO(wesm): implement delta dictionaries - return Status::NotImplemented("Delta dictionaries not yet implemented"); + return ParseDictionary(message, dictionary_memo, options); } // ---------------------------------------------------------------------- @@ -735,23 +747,25 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader { return Status::OK(); } - ARROW_ASSIGN_OR_RAISE(std::unique_ptr message, - message_reader_->ReadNextMessage()); + std::unique_ptr message; + ARROW_ASSIGN_OR_RAISE(message, message_reader_->ReadNextMessage()); if (message == nullptr) { // End of stream *batch = nullptr; return Status::OK(); } - if (message->type() == MessageType::DICTIONARY_BATCH) { - return UpdateDictionaries(*message, &dictionary_memo_, options_); - } else { - CHECK_HAS_BODY(*message); - ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); - return ReadRecordBatchInternal(*message->metadata(), schema_, field_inclusion_mask_, - &dictionary_memo_, options_, reader.get()) - .Value(batch); + // continue to read other dictionaries, if any + while (message->type() == MessageType::DICTIONARY_BATCH) { + RETURN_NOT_OK(UpdateDictionaries(*message, &dictionary_memo_, options_)); + ARROW_ASSIGN_OR_RAISE(message, message_reader_->ReadNextMessage()); } + + CHECK_HAS_BODY(*message); + ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); + return ReadRecordBatchInternal(*message->metadata(), schema_, field_inclusion_mask_, + &dictionary_memo_, options_, reader.get()) + .Value(batch); } std::shared_ptr schema() const override { return out_schema_; } diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 3587490b203..c2fe1f2f033 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -533,14 +533,15 @@ class RecordBatchSerializer { class DictionarySerializer : public RecordBatchSerializer { public: DictionarySerializer(int64_t dictionary_id, int64_t buffer_start_offset, - const IpcWriteOptions& options, IpcPayload* out) + const IpcWriteOptions& options, IpcPayload* out, bool isDelta) : RecordBatchSerializer(buffer_start_offset, options, out), - dictionary_id_(dictionary_id) {} + dictionary_id_(dictionary_id), + isDelta_(isDelta) {} Status SerializeMetadata(int64_t num_rows) override { return WriteDictionaryMessage(dictionary_id_, num_rows, out_->body_length, custom_metadata_, field_nodes_, buffer_meta_, options_, - &out_->metadata); + &out_->metadata, isDelta_); } Status Assemble(const std::shared_ptr& dictionary) { @@ -552,6 +553,7 @@ class DictionarySerializer : public RecordBatchSerializer { private: int64_t dictionary_id_; + bool isDelta_; }; } // namespace internal @@ -599,10 +601,12 @@ Status GetSchemaPayload(const Schema& schema, const IpcWriteOptions& options, } Status GetDictionaryPayload(int64_t id, const std::shared_ptr& dictionary, - const IpcWriteOptions& options, IpcPayload* out) { + const IpcWriteOptions& options, IpcPayload* out, + bool isDelta) { out->type = MessageType::DICTIONARY_BATCH; // Frame of reference is 0, see ARROW-384 - internal::DictionarySerializer assembler(id, /*buffer_start_offset=*/0, options, out); + internal::DictionarySerializer assembler(id, /*buffer_start_offset=*/0, options, out, + isDelta); return assembler.Assemble(dictionary); } @@ -1213,6 +1217,19 @@ Result> OpenRecordBatchWriter( schema, options); } +Result> NewPayloadStreamWriter( + io::OutputStream* sink, const IpcWriteOptions& options) { + return ::arrow::internal::make_unique(sink, options); +} + +Result> NewPayloadFileWriter( + io::OutputStream* sink, const std::shared_ptr& schema, + const IpcWriteOptions& options, + const std::shared_ptr& metadata) { + return ::arrow::internal::make_unique(options, schema, + metadata, sink); +} + } // namespace internal // ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index 23bb17175b9..c698516ad7a 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -292,10 +292,12 @@ Status GetSchemaPayload(const Schema& schema, const IpcWriteOptions& options, /// \param[in] dictionary the dictionary values /// \param[in] options options for serialization /// \param[out] payload the output IpcPayload +/// \param[in] isDelta whether the dictionary is a delta dictionary /// \return Status ARROW_EXPORT Status GetDictionaryPayload(int64_t id, const std::shared_ptr& dictionary, - const IpcWriteOptions& options, IpcPayload* payload); + const IpcWriteOptions& options, IpcPayload* payload, + bool isDelta = false); /// \brief Compute IpcPayload for the given record batch /// \param[in] batch the RecordBatch that is being serialized @@ -341,6 +343,29 @@ class ARROW_EXPORT IpcPayloadWriter { virtual Status Close() = 0; }; +/// Create a new IPC payload stream writer from stream sink. User is +/// responsible for closing the actual OutputStream. +/// +/// \param[in] sink output stream to write to +/// \param[in] options options for serialization +/// \return Result> +ARROW_EXPORT +Result> NewPayloadStreamWriter( + io::OutputStream* sink, const IpcWriteOptions& options = IpcWriteOptions::Defaults()); + +/// Create a new IPC payload file writer from stream sink. +/// +/// \param[in] sink output stream to write to +/// \param[in] schema the schema of the record batches to be written +/// \param[in] options options for serialization, optional +/// \param[in] metadata custom metadata for File Footer, optional +/// \return Status +ARROW_EXPORT +Result> NewPayloadFileWriter( + io::OutputStream* sink, const std::shared_ptr& schema, + const IpcWriteOptions& options = IpcWriteOptions::Defaults(), + const std::shared_ptr& metadata = NULLPTR); + /// Create a new RecordBatchWriter from IpcPayloadWriter and schema. /// /// \param[in] sink the IpcPayloadWriter to write to From fbd1150c3f64a6fbade5411700bb4e0caea1e11b Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Wed, 1 Jul 2020 15:13:01 +0800 Subject: [PATCH 2/3] ARROW-7285: [C++] Resolve comments --- cpp/src/arrow/ipc/dictionary.cc | 19 ++++-- cpp/src/arrow/ipc/dictionary.h | 10 ++- cpp/src/arrow/ipc/metadata_internal.cc | 4 +- cpp/src/arrow/ipc/metadata_internal.h | 4 +- cpp/src/arrow/ipc/read_write_test.cc | 90 +++++++++++--------------- cpp/src/arrow/ipc/reader.cc | 14 +--- cpp/src/arrow/ipc/writer.cc | 25 ++++--- cpp/src/arrow/ipc/writer.h | 20 ++++-- 8 files changed, 95 insertions(+), 91 deletions(-) diff --git a/cpp/src/arrow/ipc/dictionary.cc b/cpp/src/arrow/ipc/dictionary.cc index e774193b6a1..8a4f4b642e1 100644 --- a/cpp/src/arrow/ipc/dictionary.cc +++ b/cpp/src/arrow/ipc/dictionary.cc @@ -24,6 +24,7 @@ #include #include "arrow/array.h" +#include "arrow/array/concatenate.h" #include "arrow/extension_type.h" #include "arrow/record_batch.h" #include "arrow/status.h" @@ -142,11 +143,19 @@ Status DictionaryMemo::AddDictionary(int64_t id, return Status::OK(); } -Status DictionaryMemo::UpdateDictionary(int64_t id, - const std::shared_ptr& dictionary) { - if (!HasDictionary(id)) { - return Status::KeyError("Dictionary with id ", id, " does not exists"); - } +Status DictionaryMemo::AddDictionaryDelta(int64_t id, + const std::shared_ptr& dictionary, + MemoryPool* pool) { + std::shared_ptr originalDict, combinedDict; + RETURN_NOT_OK(GetDictionary(id, &originalDict)); + ArrayVector dictsToCombine{originalDict, dictionary}; + ARROW_ASSIGN_OR_RAISE(combinedDict, Concatenate(dictsToCombine, pool)); + id_to_dictionary_[id] = combinedDict; + return Status::OK(); +} + +Status DictionaryMemo::AddOrReplaceDictionary(int64_t id, + const std::shared_ptr& dictionary) { id_to_dictionary_[id] = dictionary; return Status::OK(); } diff --git a/cpp/src/arrow/ipc/dictionary.h b/cpp/src/arrow/ipc/dictionary.h index f70db81e7a2..c8b347cf182 100644 --- a/cpp/src/arrow/ipc/dictionary.h +++ b/cpp/src/arrow/ipc/dictionary.h @@ -25,6 +25,7 @@ #include #include +#include "arrow/memory_pool.h" #include "arrow/status.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" @@ -78,9 +79,14 @@ class ARROW_EXPORT DictionaryMemo { /// KeyError if that dictionary already exists Status AddDictionary(int64_t id, const std::shared_ptr& dictionary); - /// \brief Update a dictionary to the memo with a particular id. Returns + /// \brief Append a dictionary delta to the memo with a particular id. Returns /// KeyError if that dictionary does not exists - Status UpdateDictionary(int64_t id, const std::shared_ptr& dictionary); + Status AddDictionaryDelta(int64_t id, const std::shared_ptr& dictionary, + MemoryPool* pool); + + /// \brief Add a dictionary to the memo if it does not have one with the id, + /// otherwise, replace the dictionary with the new one. + Status AddOrReplaceDictionary(int64_t id, const std::shared_ptr& dictionary); /// \brief The stored dictionaries, in ascending id order. DictionaryVector dictionaries() const; diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc index 4809ade1ab7..4ce4a5d5b40 100644 --- a/cpp/src/arrow/ipc/metadata_internal.cc +++ b/cpp/src/arrow/ipc/metadata_internal.cc @@ -1209,10 +1209,10 @@ Result> WriteSparseTensorMessage( } Status WriteDictionaryMessage( - int64_t id, int64_t length, int64_t body_length, + int64_t id, bool isDelta, int64_t length, int64_t body_length, const std::shared_ptr& custom_metadata, const std::vector& nodes, const std::vector& buffers, - const IpcWriteOptions& options, std::shared_ptr* out, bool isDelta) { + const IpcWriteOptions& options, std::shared_ptr* out) { FBB fbb; RecordBatchOffset record_batch; RETURN_NOT_OK( diff --git a/cpp/src/arrow/ipc/metadata_internal.h b/cpp/src/arrow/ipc/metadata_internal.h index fb179a4d5c8..1cfd774dd6e 100644 --- a/cpp/src/arrow/ipc/metadata_internal.h +++ b/cpp/src/arrow/ipc/metadata_internal.h @@ -195,10 +195,10 @@ Status WriteFileFooter(const Schema& schema, const std::vector& dicti io::OutputStream* out); Status WriteDictionaryMessage( - const int64_t id, const int64_t length, const int64_t body_length, + const int64_t id, const bool isDelta, const int64_t length, const int64_t body_length, const std::shared_ptr& custom_metadata, const std::vector& nodes, const std::vector& buffers, - const IpcWriteOptions& options, std::shared_ptr* out, bool isDelta); + const IpcWriteOptions& options, std::shared_ptr* out); static inline Result> WriteFlatbufferBuilder( flatbuffers::FlatBufferBuilder& fbb) { diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 1cd4c95868d..ad447339ef8 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -1229,23 +1229,13 @@ TEST_P(TestFileFormat, RoundTrip) { } Status MakeDictionaryBatch(std::shared_ptr* out) { - const int64_t length = 6; + auto f0_type = arrow::dictionary(int32(), utf8()); + auto f1_type = arrow::dictionary(int8(), utf8()); - std::vector is_valid = {true, true, false, true, true, true}; - - auto dict_ty = utf8(); - - auto dict = ArrayFromJSON(dict_ty, "[\"foo\", \"bar\", \"baz\"]"); - - auto f0_type = arrow::dictionary(arrow::int32(), dict_ty); - auto f1_type = arrow::dictionary(arrow::int8(), dict_ty); - - std::shared_ptr indices0, indices1; - std::vector indices0_values = {1, 2, -1, 0, 2, 0}; - std::vector indices1_values = {0, 0, 2, 2, 1, 1}; + auto dict = ArrayFromJSON(utf8(), "[\"foo\", \"bar\", \"baz\"]"); - ArrayFromVector(is_valid, indices0_values, &indices0); - ArrayFromVector(is_valid, indices1_values, &indices1); + auto indices0 = ArrayFromJSON(int32(), "[1, 2, null, 0, 2, 0]"); + auto indices1 = ArrayFromJSON(int8(), "[0, 0, 2, 2, 1, 1]"); auto a0 = std::make_shared(f0_type, indices0, dict); auto a1 = std::make_shared(f1_type, indices1, dict); @@ -1253,22 +1243,18 @@ Status MakeDictionaryBatch(std::shared_ptr* out) { // construct batch auto schema = ::arrow::schema({field("dict1", f0_type), field("dict2", f1_type)}); - *out = RecordBatch::Make(schema, length, {a0, a1}); + *out = RecordBatch::Make(schema, 6, {a0, a1}); return Status::OK(); } -// A record batch writer implementation that supports manually specifying dictionaries. -class TestRecordBatchWriter : public RecordBatchWriter { +// A utility that supports reading/writing record batches, +// and manually specifying dictionaries. +class DictionaryBatchHelper { public: - explicit TestRecordBatchWriter(const Schema& schema) : schema_(schema) { + explicit DictionaryBatchHelper(const Schema& schema) : schema_(schema) { buffer_ = *AllocateResizableBuffer(0); sink_.reset(new io::BufferOutputStream(buffer_)); - payload_writer_ = *internal::NewPayloadStreamWriter(sink_.get()); - } - - Status WriteRecordBatch(const RecordBatch& batch) override { - // dumb implementation - return Status::OK(); + payload_writer_ = *internal::MakePayloadStreamWriter(sink_.get()); } Status Start() { @@ -1284,8 +1270,8 @@ class TestRecordBatchWriter : public RecordBatchWriter { Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr& dictionary, bool isDelta) { IpcPayload payload; - RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, dictionary, - IpcWriteOptions::Defaults(), &payload, isDelta)); + RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, isDelta, dictionary, + IpcWriteOptions::Defaults(), &payload)); RETURN_NOT_OK(payload_writer_->WritePayload(payload)); return Status::OK(); } @@ -1297,7 +1283,7 @@ class TestRecordBatchWriter : public RecordBatchWriter { return payload_writer_->WritePayload(payload); } - Status Close() override { + Status Close() { RETURN_NOT_OK(payload_writer_->Close()); return sink_->Close(); } @@ -1322,25 +1308,24 @@ TEST(TestDictionaryBatch, CombineDictionary) { std::shared_ptr out_batch; ASSERT_OK(MakeDictionaryBatch(&in_batch)); - auto dict_ty = utf8(); - auto dict1 = ArrayFromJSON(dict_ty, "[\"foo\", \"bar\"]"); - auto dict2 = ArrayFromJSON(dict_ty, "[\"baz\"]"); + auto dict1 = ArrayFromJSON(utf8(), "[\"foo\", \"bar\"]"); + auto dict2 = ArrayFromJSON(utf8(), "[\"baz\"]"); - TestRecordBatchWriter writer(*in_batch->schema()); - ASSERT_OK(writer.Start()); + DictionaryBatchHelper helper(*in_batch->schema()); + ASSERT_OK(helper.Start()); // combine dictionaries to make up the // original dictionaries. - ASSERT_OK(writer.WriteDictionary(0L, dict1, false)); - ASSERT_OK(writer.WriteDictionary(0L, dict2, true)); + ASSERT_OK(helper.WriteDictionary(0L, dict1, false)); + ASSERT_OK(helper.WriteDictionary(0L, dict2, true)); - ASSERT_OK(writer.WriteDictionary(1L, dict1, false)); - ASSERT_OK(writer.WriteDictionary(1L, dict2, true)); + ASSERT_OK(helper.WriteDictionary(1L, dict1, false)); + ASSERT_OK(helper.WriteDictionary(1L, dict2, true)); - ASSERT_OK(writer.WriteBatchPayload(*in_batch)); - ASSERT_OK(writer.Close()); + ASSERT_OK(helper.WriteBatchPayload(*in_batch)); + ASSERT_OK(helper.Close()); - ASSERT_OK(writer.ReadBatch(&out_batch)); + ASSERT_OK(helper.ReadBatch(&out_batch)); ASSERT_BATCHES_EQUAL(*in_batch, *out_batch); } @@ -1350,26 +1335,25 @@ TEST(TestDictionaryBatch, ReplaceDictionary) { std::shared_ptr out_batch; ASSERT_OK(MakeDictionaryBatch(&in_batch)); - auto dict_ty = utf8(); - auto dict = ArrayFromJSON(dict_ty, "[\"foo\", \"bar\", \"baz\"]"); - auto dict1 = ArrayFromJSON(dict_ty, "[\"foo1\", \"bar1\", \"baz1\"]"); - auto dict2 = ArrayFromJSON(dict_ty, "[\"foo2\", \"bar2\", \"baz2\"]"); + auto dict = ArrayFromJSON(utf8(), "[\"foo\", \"bar\", \"baz\"]"); + auto dict1 = ArrayFromJSON(utf8(), "[\"foo1\", \"bar1\", \"baz1\"]"); + auto dict2 = ArrayFromJSON(utf8(), "[\"foo2\", \"bar2\", \"baz2\"]"); - TestRecordBatchWriter writer(*in_batch->schema()); - ASSERT_OK(writer.Start()); + DictionaryBatchHelper helper(*in_batch->schema()); + ASSERT_OK(helper.Start()); // the old dictionaries will be overwritten by // the new dictionaries with the same ids. - ASSERT_OK(writer.WriteDictionary(0L, dict1, false)); - ASSERT_OK(writer.WriteDictionary(0L, dict, false)); + ASSERT_OK(helper.WriteDictionary(0L, dict1, false)); + ASSERT_OK(helper.WriteDictionary(0L, dict, false)); - ASSERT_OK(writer.WriteDictionary(1L, dict2, false)); - ASSERT_OK(writer.WriteDictionary(1L, dict, false)); + ASSERT_OK(helper.WriteDictionary(1L, dict2, false)); + ASSERT_OK(helper.WriteDictionary(1L, dict, false)); - ASSERT_OK(writer.WriteBatchPayload(*in_batch)); - ASSERT_OK(writer.Close()); + ASSERT_OK(helper.WriteBatchPayload(*in_batch)); + ASSERT_OK(helper.Close()); - ASSERT_OK(writer.ReadBatch(&out_batch)); + ASSERT_OK(helper.ReadBatch(&out_batch)); ASSERT_BATCHES_EQUAL(*in_batch, *out_batch); } diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 6faab72d819..20cab041e1e 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -29,7 +29,6 @@ #include // IWYU pragma: export #include "arrow/array.h" -#include "arrow/array/concatenate.h" #include "arrow/buffer.h" #include "arrow/extension_type.h" #include "arrow/io/interfaces.h" @@ -686,18 +685,9 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo, } auto dictionary = batch->column(0); if (dictionary_batch->isDelta()) { - std::shared_ptr originalDict, combinedDict; - RETURN_NOT_OK(dictionary_memo->GetDictionary(id, &originalDict)); - ArrayVector dictsToCombine{originalDict, dictionary}; - ARROW_ASSIGN_OR_RAISE(combinedDict, Concatenate(dictsToCombine, options.memory_pool)); - return dictionary_memo->UpdateDictionary(id, combinedDict); - } - - if (dictionary_memo->HasDictionary(id)) { - return dictionary_memo->UpdateDictionary(id, dictionary); - } else { - return dictionary_memo->AddDictionary(id, dictionary); + return dictionary_memo->AddDictionaryDelta(id, dictionary, options.memory_pool); } + return dictionary_memo->AddOrReplaceDictionary(id, dictionary); } Status ParseDictionary(const Message& message, DictionaryMemo* dictionary_memo, diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index c2fe1f2f033..154da5ac17f 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -532,16 +532,16 @@ class RecordBatchSerializer { class DictionarySerializer : public RecordBatchSerializer { public: - DictionarySerializer(int64_t dictionary_id, int64_t buffer_start_offset, - const IpcWriteOptions& options, IpcPayload* out, bool isDelta) + DictionarySerializer(int64_t dictionary_id, bool isDelta, int64_t buffer_start_offset, + const IpcWriteOptions& options, IpcPayload* out) : RecordBatchSerializer(buffer_start_offset, options, out), dictionary_id_(dictionary_id), isDelta_(isDelta) {} Status SerializeMetadata(int64_t num_rows) override { - return WriteDictionaryMessage(dictionary_id_, num_rows, out_->body_length, + return WriteDictionaryMessage(dictionary_id_, isDelta_, num_rows, out_->body_length, custom_metadata_, field_nodes_, buffer_meta_, options_, - &out_->metadata, isDelta_); + &out_->metadata); } Status Assemble(const std::shared_ptr& dictionary) { @@ -601,12 +601,17 @@ Status GetSchemaPayload(const Schema& schema, const IpcWriteOptions& options, } Status GetDictionaryPayload(int64_t id, const std::shared_ptr& dictionary, - const IpcWriteOptions& options, IpcPayload* out, - bool isDelta) { + const IpcWriteOptions& options, IpcPayload* out) { + return GetDictionaryPayload(id, false, dictionary, options, out); +} + +Status GetDictionaryPayload(int64_t id, bool isDelta, + const std::shared_ptr& dictionary, + const IpcWriteOptions& options, IpcPayload* out) { out->type = MessageType::DICTIONARY_BATCH; // Frame of reference is 0, see ARROW-384 - internal::DictionarySerializer assembler(id, /*buffer_start_offset=*/0, options, out, - isDelta); + internal::DictionarySerializer assembler(id, isDelta, /*buffer_start_offset=*/0, + options, out); return assembler.Assemble(dictionary); } @@ -1217,12 +1222,12 @@ Result> OpenRecordBatchWriter( schema, options); } -Result> NewPayloadStreamWriter( +Result> MakePayloadStreamWriter( io::OutputStream* sink, const IpcWriteOptions& options) { return ::arrow::internal::make_unique(sink, options); } -Result> NewPayloadFileWriter( +Result> MakePayloadFileWriter( io::OutputStream* sink, const std::shared_ptr& schema, const IpcWriteOptions& options, const std::shared_ptr& metadata) { diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index c698516ad7a..8fd2ae97117 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -292,12 +292,22 @@ Status GetSchemaPayload(const Schema& schema, const IpcWriteOptions& options, /// \param[in] dictionary the dictionary values /// \param[in] options options for serialization /// \param[out] payload the output IpcPayload -/// \param[in] isDelta whether the dictionary is a delta dictionary /// \return Status ARROW_EXPORT Status GetDictionaryPayload(int64_t id, const std::shared_ptr& dictionary, - const IpcWriteOptions& options, IpcPayload* payload, - bool isDelta = false); + const IpcWriteOptions& options, IpcPayload* payload); + +/// \brief Compute IpcPayload for a dictionary +/// \param[in] id the dictionary id +/// \param[in] isDelta whether the dictionary is a delta dictionary +/// \param[in] dictionary the dictionary values +/// \param[in] options options for serialization +/// \param[out] payload the output IpcPayload +/// \return Status +ARROW_EXPORT +Status GetDictionaryPayload(int64_t id, bool isDelta, + const std::shared_ptr& dictionary, + const IpcWriteOptions& options, IpcPayload* payload); /// \brief Compute IpcPayload for the given record batch /// \param[in] batch the RecordBatch that is being serialized @@ -350,7 +360,7 @@ class ARROW_EXPORT IpcPayloadWriter { /// \param[in] options options for serialization /// \return Result> ARROW_EXPORT -Result> NewPayloadStreamWriter( +Result> MakePayloadStreamWriter( io::OutputStream* sink, const IpcWriteOptions& options = IpcWriteOptions::Defaults()); /// Create a new IPC payload file writer from stream sink. @@ -361,7 +371,7 @@ Result> NewPayloadStreamWriter( /// \param[in] metadata custom metadata for File Footer, optional /// \return Status ARROW_EXPORT -Result> NewPayloadFileWriter( +Result> MakePayloadFileWriter( io::OutputStream* sink, const std::shared_ptr& schema, const IpcWriteOptions& options = IpcWriteOptions::Defaults(), const std::shared_ptr& metadata = NULLPTR); From 83f2d297326e180e1a7ecda41bd9ca183bacafdd Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 1 Jul 2020 14:14:28 +0200 Subject: [PATCH 3/3] Add one test --- cpp/src/arrow/ipc/metadata_internal.cc | 4 +-- cpp/src/arrow/ipc/metadata_internal.h | 3 +- cpp/src/arrow/ipc/read_write_test.cc | 49 ++++++++++++++++++-------- cpp/src/arrow/ipc/writer.cc | 12 +++---- cpp/src/arrow/ipc/writer.h | 4 +-- 5 files changed, 47 insertions(+), 25 deletions(-) diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc index 4ce4a5d5b40..dd642d5bd65 100644 --- a/cpp/src/arrow/ipc/metadata_internal.cc +++ b/cpp/src/arrow/ipc/metadata_internal.cc @@ -1209,7 +1209,7 @@ Result> WriteSparseTensorMessage( } Status WriteDictionaryMessage( - int64_t id, bool isDelta, int64_t length, int64_t body_length, + int64_t id, bool is_delta, int64_t length, int64_t body_length, const std::shared_ptr& custom_metadata, const std::vector& nodes, const std::vector& buffers, const IpcWriteOptions& options, std::shared_ptr* out) { @@ -1218,7 +1218,7 @@ Status WriteDictionaryMessage( RETURN_NOT_OK( MakeRecordBatch(fbb, length, body_length, nodes, buffers, options, &record_batch)); auto dictionary_batch = - flatbuf::CreateDictionaryBatch(fbb, id, record_batch, isDelta).Union(); + flatbuf::CreateDictionaryBatch(fbb, id, record_batch, is_delta).Union(); return WriteFBMessage(fbb, flatbuf::MessageHeader::DictionaryBatch, dictionary_batch, body_length, custom_metadata) .Value(out); diff --git a/cpp/src/arrow/ipc/metadata_internal.h b/cpp/src/arrow/ipc/metadata_internal.h index 1cfd774dd6e..5c1a032042b 100644 --- a/cpp/src/arrow/ipc/metadata_internal.h +++ b/cpp/src/arrow/ipc/metadata_internal.h @@ -195,7 +195,8 @@ Status WriteFileFooter(const Schema& schema, const std::vector& dicti io::OutputStream* out); Status WriteDictionaryMessage( - const int64_t id, const bool isDelta, const int64_t length, const int64_t body_length, + const int64_t id, const bool is_delta, const int64_t length, + const int64_t body_length, const std::shared_ptr& custom_metadata, const std::vector& nodes, const std::vector& buffers, const IpcWriteOptions& options, std::shared_ptr* out); diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index ad447339ef8..374cf9deacb 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -1268,9 +1268,9 @@ class DictionaryBatchHelper { } Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr& dictionary, - bool isDelta) { + bool is_delta) { IpcPayload payload; - RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, isDelta, dictionary, + RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, is_delta, dictionary, IpcWriteOptions::Defaults(), &payload)); RETURN_NOT_OK(payload_writer_->WritePayload(payload)); return Status::OK(); @@ -1303,7 +1303,7 @@ class DictionaryBatchHelper { std::unique_ptr sink_; }; -TEST(TestDictionaryBatch, CombineDictionary) { +TEST(TestDictionaryBatch, DictionaryDelta) { std::shared_ptr in_batch; std::shared_ptr out_batch; ASSERT_OK(MakeDictionaryBatch(&in_batch)); @@ -1314,13 +1314,11 @@ TEST(TestDictionaryBatch, CombineDictionary) { DictionaryBatchHelper helper(*in_batch->schema()); ASSERT_OK(helper.Start()); - // combine dictionaries to make up the - // original dictionaries. - ASSERT_OK(helper.WriteDictionary(0L, dict1, false)); - ASSERT_OK(helper.WriteDictionary(0L, dict2, true)); + ASSERT_OK(helper.WriteDictionary(0L, dict1, /*is_delta=*/false)); + ASSERT_OK(helper.WriteDictionary(0L, dict2, /*is_delta=*/true)); - ASSERT_OK(helper.WriteDictionary(1L, dict1, false)); - ASSERT_OK(helper.WriteDictionary(1L, dict2, true)); + ASSERT_OK(helper.WriteDictionary(1L, dict1, /*is_delta=*/false)); + ASSERT_OK(helper.WriteDictionary(1L, dict2, /*is_delta=*/true)); ASSERT_OK(helper.WriteBatchPayload(*in_batch)); ASSERT_OK(helper.Close()); @@ -1330,7 +1328,30 @@ TEST(TestDictionaryBatch, CombineDictionary) { ASSERT_BATCHES_EQUAL(*in_batch, *out_batch); } -TEST(TestDictionaryBatch, ReplaceDictionary) { +TEST(TestDictionaryBatch, DictionaryDeltaWithUnknownId) { + std::shared_ptr in_batch; + std::shared_ptr out_batch; + ASSERT_OK(MakeDictionaryBatch(&in_batch)); + + auto dict1 = ArrayFromJSON(utf8(), "[\"foo\", \"bar\"]"); + auto dict2 = ArrayFromJSON(utf8(), "[\"baz\"]"); + + DictionaryBatchHelper helper(*in_batch->schema()); + ASSERT_OK(helper.Start()); + + ASSERT_OK(helper.WriteDictionary(0L, dict1, /*is_delta=*/false)); + ASSERT_OK(helper.WriteDictionary(0L, dict2, /*is_delta=*/true)); + + /* This delta dictionary does not have a base dictionary previously in stream */ + ASSERT_OK(helper.WriteDictionary(1L, dict2, /*is_delta=*/true)); + + ASSERT_OK(helper.WriteBatchPayload(*in_batch)); + ASSERT_OK(helper.Close()); + + ASSERT_RAISES(KeyError, helper.ReadBatch(&out_batch)); +} + +TEST(TestDictionaryBatch, DictionaryReplacement) { std::shared_ptr in_batch; std::shared_ptr out_batch; ASSERT_OK(MakeDictionaryBatch(&in_batch)); @@ -1344,11 +1365,11 @@ TEST(TestDictionaryBatch, ReplaceDictionary) { // the old dictionaries will be overwritten by // the new dictionaries with the same ids. - ASSERT_OK(helper.WriteDictionary(0L, dict1, false)); - ASSERT_OK(helper.WriteDictionary(0L, dict, false)); + ASSERT_OK(helper.WriteDictionary(0L, dict1, /*is_delta=*/false)); + ASSERT_OK(helper.WriteDictionary(0L, dict, /*is_delta=*/false)); - ASSERT_OK(helper.WriteDictionary(1L, dict2, false)); - ASSERT_OK(helper.WriteDictionary(1L, dict, false)); + ASSERT_OK(helper.WriteDictionary(1L, dict2, /*is_delta=*/false)); + ASSERT_OK(helper.WriteDictionary(1L, dict, /*is_delta=*/false)); ASSERT_OK(helper.WriteBatchPayload(*in_batch)); ASSERT_OK(helper.Close()); diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 154da5ac17f..4db61364383 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -532,14 +532,14 @@ class RecordBatchSerializer { class DictionarySerializer : public RecordBatchSerializer { public: - DictionarySerializer(int64_t dictionary_id, bool isDelta, int64_t buffer_start_offset, + DictionarySerializer(int64_t dictionary_id, bool is_delta, int64_t buffer_start_offset, const IpcWriteOptions& options, IpcPayload* out) : RecordBatchSerializer(buffer_start_offset, options, out), dictionary_id_(dictionary_id), - isDelta_(isDelta) {} + is_delta_(is_delta) {} Status SerializeMetadata(int64_t num_rows) override { - return WriteDictionaryMessage(dictionary_id_, isDelta_, num_rows, out_->body_length, + return WriteDictionaryMessage(dictionary_id_, is_delta_, num_rows, out_->body_length, custom_metadata_, field_nodes_, buffer_meta_, options_, &out_->metadata); } @@ -553,7 +553,7 @@ class DictionarySerializer : public RecordBatchSerializer { private: int64_t dictionary_id_; - bool isDelta_; + bool is_delta_; }; } // namespace internal @@ -605,12 +605,12 @@ Status GetDictionaryPayload(int64_t id, const std::shared_ptr& dictionary return GetDictionaryPayload(id, false, dictionary, options, out); } -Status GetDictionaryPayload(int64_t id, bool isDelta, +Status GetDictionaryPayload(int64_t id, bool is_delta, const std::shared_ptr& dictionary, const IpcWriteOptions& options, IpcPayload* out) { out->type = MessageType::DICTIONARY_BATCH; // Frame of reference is 0, see ARROW-384 - internal::DictionarySerializer assembler(id, isDelta, /*buffer_start_offset=*/0, + internal::DictionarySerializer assembler(id, is_delta, /*buffer_start_offset=*/0, options, out); return assembler.Assemble(dictionary); } diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index 8fd2ae97117..9fd782ae1af 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -299,13 +299,13 @@ Status GetDictionaryPayload(int64_t id, const std::shared_ptr& dictionary /// \brief Compute IpcPayload for a dictionary /// \param[in] id the dictionary id -/// \param[in] isDelta whether the dictionary is a delta dictionary +/// \param[in] is_delta whether the dictionary is a delta dictionary /// \param[in] dictionary the dictionary values /// \param[in] options options for serialization /// \param[out] payload the output IpcPayload /// \return Status ARROW_EXPORT -Status GetDictionaryPayload(int64_t id, bool isDelta, +Status GetDictionaryPayload(int64_t id, bool is_delta, const std::shared_ptr& dictionary, const IpcWriteOptions& options, IpcPayload* payload);