From 45814295691228a57aec89c22992cca084f7ad59 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 1 Dec 2020 16:12:47 +0100 Subject: [PATCH 1/2] ARROW-6883: [C++][Python] Allow writing dictionary deltas * Add an ipc::IpcWriteOptions member to govern emission of dictionary deltas. If the option is enabled, deltas are detected by checking whether the new dictionary starts with the last emitted one for the same field. However, for nested dictionaries, deltas are not emitted for the outer dictionary, as the read path doesn't support it. * Add a stats() method to ipc::StreamDecoder * Expose the IPC statistics in Python, and add tests --- cpp/src/arrow/array/array_base.cc | 18 +-- cpp/src/arrow/array/array_base.h | 12 +- cpp/src/arrow/flight/client.cc | 8 ++ cpp/src/arrow/flight/server.cc | 9 +- cpp/src/arrow/ipc/options.h | 14 +++ cpp/src/arrow/ipc/read_write_test.cc | 164 +++++++++++++++++++++++++-- cpp/src/arrow/ipc/reader.cc | 44 ++++--- cpp/src/arrow/ipc/reader.h | 5 +- cpp/src/arrow/ipc/writer.cc | 76 ++++++++++--- cpp/src/arrow/ipc/writer.h | 20 ++++ python/pyarrow/includes/libarrow.pxd | 35 ++++-- python/pyarrow/ipc.pxi | 82 +++++++++++++- python/pyarrow/ipc.py | 3 +- python/pyarrow/tests/test_ipc.py | 56 +++++++++ 14 files changed, 485 insertions(+), 61 deletions(-) diff --git a/cpp/src/arrow/array/array_base.cc b/cpp/src/arrow/array/array_base.cc index 265f3d91372..67c5ca84e1f 100644 --- a/cpp/src/arrow/array/array_base.cc +++ b/cpp/src/arrow/array/array_base.cc @@ -222,29 +222,31 @@ bool Array::ApproxEquals(const std::shared_ptr& arr, } bool Array::RangeEquals(const Array& other, int64_t start_idx, int64_t end_idx, - int64_t other_start_idx) const { - return ArrayRangeEquals(*this, other, start_idx, end_idx, other_start_idx); + int64_t other_start_idx, const EqualOptions& opts) const { + return ArrayRangeEquals(*this, other, start_idx, end_idx, other_start_idx, opts); } bool Array::RangeEquals(const std::shared_ptr& other, int64_t start_idx, - int64_t end_idx, int64_t other_start_idx) const { + int64_t end_idx, int64_t other_start_idx, + const EqualOptions& opts) const { if (!other) { return false; } - return ArrayRangeEquals(*this, *other, start_idx, end_idx, other_start_idx); + return ArrayRangeEquals(*this, *other, start_idx, end_idx, other_start_idx, opts); } bool Array::RangeEquals(int64_t start_idx, int64_t end_idx, int64_t other_start_idx, - const Array& other) const { - return ArrayRangeEquals(*this, other, start_idx, end_idx, other_start_idx); + const Array& other, const EqualOptions& opts) const { + return ArrayRangeEquals(*this, other, start_idx, end_idx, other_start_idx, opts); } bool Array::RangeEquals(int64_t start_idx, int64_t end_idx, int64_t other_start_idx, - const std::shared_ptr& other) const { + const std::shared_ptr& other, + const EqualOptions& opts) const { if (!other) { return false; } - return ArrayRangeEquals(*this, *other, start_idx, end_idx, other_start_idx); + return ArrayRangeEquals(*this, *other, start_idx, end_idx, other_start_idx, opts); } std::shared_ptr Array::Slice(int64_t offset, int64_t length) const { diff --git a/cpp/src/arrow/array/array_base.h b/cpp/src/arrow/array/array_base.h index ef0b1b626e3..9bcd1621840 100644 --- a/cpp/src/arrow/array/array_base.h +++ b/cpp/src/arrow/array/array_base.h @@ -119,13 +119,17 @@ class ARROW_EXPORT Array { /// Compare if the range of slots specified are equal for the given array and /// this array. end_idx exclusive. This methods does not bounds check. bool RangeEquals(int64_t start_idx, int64_t end_idx, int64_t other_start_idx, - const Array& other) const; + const Array& other, + const EqualOptions& = EqualOptions::Defaults()) const; bool RangeEquals(int64_t start_idx, int64_t end_idx, int64_t other_start_idx, - const std::shared_ptr& other) const; + const std::shared_ptr& other, + const EqualOptions& = EqualOptions::Defaults()) const; bool RangeEquals(const Array& other, int64_t start_idx, int64_t end_idx, - int64_t other_start_idx) const; + int64_t other_start_idx, + const EqualOptions& = EqualOptions::Defaults()) const; bool RangeEquals(const std::shared_ptr& other, int64_t start_idx, - int64_t end_idx, int64_t other_start_idx) const; + int64_t end_idx, int64_t other_start_idx, + const EqualOptions& = EqualOptions::Defaults()) const; Status Accept(ArrayVisitor* visitor) const; diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc index 5c56e6409a7..f42fbdaa9cf 100644 --- a/cpp/src/arrow/flight/client.cc +++ b/cpp/src/arrow/flight/client.cc @@ -664,12 +664,14 @@ class GrpcStreamWriter : public FlightStreamWriter { } return Status::OK(); } + Status WriteWithMetadata(const RecordBatch& batch, std::shared_ptr app_metadata) override { RETURN_NOT_OK(CheckStarted()); app_metadata_ = app_metadata; return batch_writer_->WriteRecordBatch(batch); } + Status DoneWriting() override { // Do not CheckStarted - DoneWriting applies to data and metadata if (batch_writer_) { @@ -683,6 +685,7 @@ class GrpcStreamWriter : public FlightStreamWriter { } return writer_->DoneWriting(); } + Status Close() override { // Do not CheckStarted - Close applies to data and metadata if (batch_writer_ && !writer_closed_) { @@ -698,6 +701,11 @@ class GrpcStreamWriter : public FlightStreamWriter { return writer_->Finish(Status::OK()); } + ipc::WriteStats stats() const override { + ARROW_CHECK_NE(batch_writer_, nullptr); + return batch_writer_->stats(); + } + private: friend class DoPutPayloadWriter; std::shared_ptr app_metadata_; diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc index 87c96ce4910..4e35950c871 100644 --- a/cpp/src/arrow/flight/server.cc +++ b/cpp/src/arrow/flight/server.cc @@ -314,7 +314,9 @@ class DoExchangeMessageWriter : public FlightMessageWriter { payload.app_metadata = app_metadata; } RETURN_NOT_OK(ipc::GetRecordBatchPayload(batch, ipc_options_, &payload.ipc_message)); - return WritePayload(payload); + RETURN_NOT_OK(WritePayload(payload)); + ++stats_.num_record_batches; + return Status::OK(); } Status Close() override { @@ -322,12 +324,15 @@ class DoExchangeMessageWriter : public FlightMessageWriter { return Status::OK(); } + ipc::WriteStats stats() const override { return stats_; } + private: Status WritePayload(const FlightPayload& payload) { if (!internal::WritePayload(payload, stream_)) { // gRPC doesn't give us any way to find what the error was (if any). return Status::IOError("Could not write payload to stream"); } + ++stats_.num_messages; return Status::OK(); } @@ -350,6 +355,7 @@ class DoExchangeMessageWriter : public FlightMessageWriter { RETURN_NOT_OK(ipc::GetDictionaryPayload(pair.first, pair.second, ipc_options_, &payload.ipc_message)); RETURN_NOT_OK(WritePayload(payload)); + ++stats_.num_dictionary_batches; } return Status::OK(); } @@ -357,6 +363,7 @@ class DoExchangeMessageWriter : public FlightMessageWriter { grpc::ServerReaderWriter* stream_; ::arrow::ipc::IpcWriteOptions ipc_options_; ipc::DictionaryFieldMapper mapper_; + ipc::WriteStats stats_; bool started_ = false; bool dictionaries_written_ = false; }; diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h index bf535cdacf3..83a3c2d0330 100644 --- a/cpp/src/arrow/ipc/options.h +++ b/cpp/src/arrow/ipc/options.h @@ -65,6 +65,20 @@ struct ARROW_EXPORT IpcWriteOptions { /// like compression bool use_threads = true; + /// \brief Whether to emit dictionary deltas + /// + /// If false, a changed dictionary for a given field will emit a full + /// dictionary replacement. + /// If true, a changed dictionary will be compared against the previous + /// version. If possible, a dictionary delta will be omitted, otherwise + /// a full dictionary replacement. + /// + /// Default is false to maximize stream compatibility. + /// + /// Also, note that if a changed dictionary is a nested dictionary, + /// then a delta is never emitted, for compatibility with the read path. + bool emit_dictionary_deltas = false; + /// \brief Format version to use for IPC messages and their metadata. /// /// Presently using V5 version (readable by 1.0.0 and later). diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 2e1e6ad615f..3180a6880b2 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -929,8 +929,11 @@ struct FileWriterHelper { return Status::OK(); } - Status Finish() { + Status Finish(WriteStats* out_stats = nullptr) { RETURN_NOT_OK(writer_->Close()); + if (out_stats) { + *out_stats = writer_->stats(); + } RETURN_NOT_OK(sink_->Close()); // Current offset into stream is the end of the file return sink_->Tell().Value(&footer_offset_); @@ -997,8 +1000,11 @@ struct StreamWriterHelper { return Status::OK(); } - Status Finish() { + Status Finish(WriteStats* out_stats = nullptr) { RETURN_NOT_OK(writer_->Close()); + if (out_stats) { + *out_stats = writer_->stats(); + } return sink_->Close(); } @@ -1034,13 +1040,13 @@ struct StreamWriterHelper { struct StreamDecoderWriterHelper : public StreamWriterHelper { Status ReadBatches(const IpcReadOptions& options, BatchVector* out_batches, ReadStats* out_stats = nullptr) override { - if (out_stats) { - return Status::NotImplemented("StreamDecoder does not support stats()"); - } auto listener = std::make_shared(); StreamDecoder decoder(listener, options); RETURN_NOT_OK(DoConsume(&decoder)); *out_batches = listener->record_batches(); + if (out_stats) { + *out_stats = decoder.stats(); + } return Status::OK(); } @@ -1763,6 +1769,40 @@ class TestDictionaryReplacement : public ::testing::Test { EXPECT_EQ(read_stats_.num_dictionary_deltas, 0); } + void TestDeltaDict() { + auto type = dictionary(int8(), utf8()); + auto batch1 = MakeBatch(ArrayFromJSON(type, R"(["foo", "foo", "bar", null])")); + // Potential delta + auto batch2 = MakeBatch(ArrayFromJSON(type, R"(["foo", "bar", "quux", "foo"])")); + // Potential delta + auto batch3 = + MakeBatch(ArrayFromJSON(type, R"(["foo", "bar", "quux", "zzz", "foo"])")); + auto batch4 = MakeBatch(ArrayFromJSON(type, R"(["bar", null, "quux", "foo"])")); + BatchVector batches{batch1, batch2, batch3, batch4}; + if (WriterHelper::kIsFileFormat) { + CheckWritingFails(batches, 1); + } else { + CheckRoundtrip(batches); + EXPECT_EQ(read_stats_.num_messages, 9); // including schema message + EXPECT_EQ(read_stats_.num_record_batches, 4); + EXPECT_EQ(read_stats_.num_dictionary_batches, 4); + EXPECT_EQ(read_stats_.num_replaced_dictionaries, 3); + EXPECT_EQ(read_stats_.num_dictionary_deltas, 0); + } + + write_options_.emit_dictionary_deltas = true; + if (WriterHelper::kIsFileFormat) { + CheckWritingFails(batches, 1); + } else { + CheckRoundtrip(batches); + EXPECT_EQ(read_stats_.num_messages, 9); // including schema message + EXPECT_EQ(read_stats_.num_record_batches, 4); + EXPECT_EQ(read_stats_.num_dictionary_batches, 4); + EXPECT_EQ(read_stats_.num_replaced_dictionaries, 1); + EXPECT_EQ(read_stats_.num_dictionary_deltas, 2); + } + } + void TestSameDictValuesNested() { CheckRoundtrip(SameValuesNestedDictBatches()); @@ -1821,13 +1861,96 @@ class TestDictionaryReplacement : public ::testing::Test { EXPECT_EQ(read_stats_.num_dictionary_deltas, 0); } + void TestDeltaDictNestedOuter() { + // Outer dict changes, inner dict remains the same + auto value_type = list(dictionary(int8(), utf8())); + auto type = dictionary(int8(), value_type); + // Inner dict: ["a", "b"] + auto batch1_values = ArrayFromJSON(value_type, R"([["a"], ["b"]])"); + // Potential delta + auto batch2_values = ArrayFromJSON(value_type, R"([["a"], ["b"], ["a", "a"]])"); + auto batch1 = MakeBatch(type, ArrayFromJSON(int8(), "[1, 0, 1]"), batch1_values); + auto batch2 = + MakeBatch(type, ArrayFromJSON(int8(), "[2, null, 0, 0]"), batch2_values); + BatchVector batches{batch1, batch2}; + + if (WriterHelper::kIsFileFormat) { + CheckWritingFails(batches, 1); + } else { + CheckRoundtrip(batches); + EXPECT_EQ(read_stats_.num_messages, 6); // including schema message + EXPECT_EQ(read_stats_.num_record_batches, 2); + EXPECT_EQ(read_stats_.num_dictionary_batches, 3); + EXPECT_EQ(read_stats_.num_replaced_dictionaries, 1); + EXPECT_EQ(read_stats_.num_dictionary_deltas, 0); + } + + write_options_.emit_dictionary_deltas = true; + if (WriterHelper::kIsFileFormat) { + CheckWritingFails(batches, 1); + } else { + // Outer dict deltas are not emitted as the read path doesn't support them + CheckRoundtrip(batches); + EXPECT_EQ(read_stats_.num_messages, 6); // including schema message + EXPECT_EQ(read_stats_.num_record_batches, 2); + EXPECT_EQ(read_stats_.num_dictionary_batches, 3); + EXPECT_EQ(read_stats_.num_replaced_dictionaries, 1); + EXPECT_EQ(read_stats_.num_dictionary_deltas, 0); + } + } + + void TestDeltaDictNestedInner() { + // Inner dict changes + auto value_type = list(dictionary(int8(), utf8())); + auto type = dictionary(int8(), value_type); + // Inner dict: ["a"] + auto batch1_values = ArrayFromJSON(value_type, R"([["a"]])"); + // Inner dict: ["a", "b"] => potential delta + auto batch2_values = ArrayFromJSON(value_type, R"([["a"], ["b"], ["a", "a"]])"); + // Inner dict: ["a", "b", "c"] => potential delta + auto batch3_values = ArrayFromJSON(value_type, R"([["a"], ["b"], ["c"]])"); + // Inner dict: ["a", "b", "c"] + auto batch4_values = ArrayFromJSON(value_type, R"([["a"], ["b", "c"]])"); + // Inner dict: ["a", "c", "b"] => replacement + auto batch5_values = ArrayFromJSON(value_type, R"([["a"], ["c"], ["b"]])"); + auto batch1 = MakeBatch(type, ArrayFromJSON(int8(), "[0, null, 0]"), batch1_values); + auto batch2 = MakeBatch(type, ArrayFromJSON(int8(), "[1, 0, 2]"), batch2_values); + auto batch3 = MakeBatch(type, ArrayFromJSON(int8(), "[1, 0, 2]"), batch3_values); + auto batch4 = MakeBatch(type, ArrayFromJSON(int8(), "[1, 0, null]"), batch4_values); + auto batch5 = MakeBatch(type, ArrayFromJSON(int8(), "[1, 0, 2]"), batch5_values); + BatchVector batches{batch1, batch2, batch3, batch4, batch5}; + + if (WriterHelper::kIsFileFormat) { + CheckWritingFails(batches, 1); + } else { + CheckRoundtrip(batches); + EXPECT_EQ(read_stats_.num_messages, 15); // including schema message + EXPECT_EQ(read_stats_.num_record_batches, 5); + EXPECT_EQ(read_stats_.num_dictionary_batches, 9); // 4 inner + 5 outer + EXPECT_EQ(read_stats_.num_replaced_dictionaries, 7); + EXPECT_EQ(read_stats_.num_dictionary_deltas, 0); + } + + write_options_.emit_dictionary_deltas = true; + if (WriterHelper::kIsFileFormat) { + CheckWritingFails(batches, 1); + } else { + CheckRoundtrip(batches); + EXPECT_EQ(read_stats_.num_messages, 15); // including schema message + EXPECT_EQ(read_stats_.num_record_batches, 5); + EXPECT_EQ(read_stats_.num_dictionary_batches, 9); // 4 inner + 5 outer + EXPECT_EQ(read_stats_.num_replaced_dictionaries, 5); + EXPECT_EQ(read_stats_.num_dictionary_deltas, 2); + } + } + Status RoundTrip(const BatchVector& in_batches, BatchVector* out_batches) { WriterHelper writer_helper; RETURN_NOT_OK(writer_helper.Init(in_batches[0]->schema(), write_options_)); for (const auto& batch : in_batches) { RETURN_NOT_OK(writer_helper.WriteBatch(batch)); } - RETURN_NOT_OK(writer_helper.Finish()); + RETURN_NOT_OK(writer_helper.Finish(&write_stats_)); RETURN_NOT_OK(writer_helper.ReadBatches(read_options_, out_batches, &read_stats_)); for (const auto& batch : *out_batches) { RETURN_NOT_OK(batch->ValidateFull()); @@ -1838,6 +1961,7 @@ class TestDictionaryReplacement : public ::testing::Test { void CheckRoundtrip(const BatchVector& in_batches) { BatchVector out_batches; ASSERT_OK(RoundTrip(in_batches, &out_batches)); + CheckStatsConsistent(); ASSERT_EQ(in_batches.size(), out_batches.size()); for (size_t i = 0; i < in_batches.size(); ++i) { AssertBatchesEqual(*in_batches[i], *out_batches[i]); @@ -1853,6 +1977,15 @@ class TestDictionaryReplacement : public ::testing::Test { ASSERT_RAISES(Invalid, writer_helper.WriteBatch(in_batches[fails_at_batch_num])); } + void CheckStatsConsistent() { + ASSERT_EQ(read_stats_.num_messages, write_stats_.num_messages); + ASSERT_EQ(read_stats_.num_record_batches, write_stats_.num_record_batches); + ASSERT_EQ(read_stats_.num_dictionary_batches, write_stats_.num_dictionary_batches); + ASSERT_EQ(read_stats_.num_replaced_dictionaries, + write_stats_.num_replaced_dictionaries); + ASSERT_EQ(read_stats_.num_dictionary_deltas, write_stats_.num_dictionary_deltas); + } + BatchVector DifferentOrderDictBatches() { // Create two separate dictionaries with different order auto type = dictionary(int8(), utf8()); @@ -1919,6 +2052,7 @@ class TestDictionaryReplacement : public ::testing::Test { protected: IpcWriteOptions write_options_ = IpcWriteOptions::Defaults(); IpcReadOptions read_options_ = IpcReadOptions::Defaults(); + WriteStats write_stats_; ReadStats read_stats_; }; @@ -1928,6 +2062,8 @@ TYPED_TEST_P(TestDictionaryReplacement, SameDictPointer) { this->TestSameDictPoi TYPED_TEST_P(TestDictionaryReplacement, SameDictValues) { this->TestSameDictValues(); } +TYPED_TEST_P(TestDictionaryReplacement, DeltaDict) { this->TestDeltaDict(); } + TYPED_TEST_P(TestDictionaryReplacement, SameDictValuesNested) { this->TestSameDictValuesNested(); } @@ -1940,12 +2076,22 @@ TYPED_TEST_P(TestDictionaryReplacement, DifferentDictValuesNested) { this->TestDifferentDictValuesNested(); } +TYPED_TEST_P(TestDictionaryReplacement, DeltaDictNestedOuter) { + this->TestDeltaDictNestedOuter(); +} + +TYPED_TEST_P(TestDictionaryReplacement, DeltaDictNestedInner) { + this->TestDeltaDictNestedInner(); +} + REGISTER_TYPED_TEST_SUITE_P(TestDictionaryReplacement, SameDictPointer, SameDictValues, - SameDictValuesNested, DifferentDictValues, - DifferentDictValuesNested); + DeltaDict, SameDictValuesNested, DifferentDictValues, + DifferentDictValuesNested, DeltaDictNestedOuter, + DeltaDictNestedInner); using DictionaryReplacementTestTypes = - ::testing::Types; + ::testing::Types; INSTANTIATE_TYPED_TEST_SUITE_P(TestDictionaryReplacement, TestDictionaryReplacement, DictionaryReplacementTestTypes); diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 92f2b70f294..3d855425c7a 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -1142,20 +1142,16 @@ class StreamDecoder::StreamDecoderImpl : public MessageDecoderListener { }; public: - explicit StreamDecoderImpl(std::shared_ptr listener, - const IpcReadOptions& options) - : MessageDecoderListener(), - listener_(std::move(listener)), - options_(options), + explicit StreamDecoderImpl(std::shared_ptr listener, IpcReadOptions options) + : listener_(std::move(listener)), + options_(std::move(options)), state_(State::SCHEMA), message_decoder_(std::shared_ptr(this, [](void*) {}), options_.memory_pool), - field_inclusion_mask_(), - n_required_dictionaries_(0), - dictionary_memo_(), - schema_() {} + n_required_dictionaries_(0) {} Status OnMessageDecoded(std::unique_ptr message) override { + ++stats_.num_messages; switch (state_) { case State::SCHEMA: ARROW_RETURN_NOT_OK(OnSchemaMessageDecoded(std::move(message))); @@ -1189,6 +1185,8 @@ class StreamDecoder::StreamDecoderImpl : public MessageDecoderListener { int64_t next_required_size() const { return message_decoder_.next_required_size(); } + ReadStats stats() const { return stats_; } + private: Status OnSchemaMessageDecoded(std::unique_ptr message) { RETURN_NOT_OK(UnpackSchemaMessage(*message, options_, &dictionary_memo_, &schema_, @@ -1229,30 +1227,42 @@ class StreamDecoder::StreamDecoderImpl : public MessageDecoderListener { auto batch, ReadRecordBatchInternal(*message->metadata(), schema_, field_inclusion_mask_, &dictionary_memo_, options_, reader.get())); + ++stats_.num_record_batches; return listener_->OnRecordBatchDecoded(std::move(batch)); } } // Read dictionary from dictionary batch Status ReadDictionary(const Message& message) { - // TODO accumulate and expose ReadStats - DictionaryKind unused_kind; - return ::arrow::ipc::ReadDictionary(message, &dictionary_memo_, options_, - &unused_kind); + DictionaryKind kind; + RETURN_NOT_OK( + ::arrow::ipc::ReadDictionary(message, &dictionary_memo_, options_, &kind)); + ++stats_.num_dictionary_batches; + switch (kind) { + case DictionaryKind::New: + break; + case DictionaryKind::Delta: + ++stats_.num_dictionary_deltas; + break; + case DictionaryKind::Replacement: + ++stats_.num_replaced_dictionaries; + break; + } + return Status::OK(); } std::shared_ptr listener_; - IpcReadOptions options_; + const IpcReadOptions options_; State state_; MessageDecoder message_decoder_; std::vector field_inclusion_mask_; int n_required_dictionaries_; DictionaryMemo dictionary_memo_; std::shared_ptr schema_, out_schema_; + ReadStats stats_; }; -StreamDecoder::StreamDecoder(std::shared_ptr listener, - const IpcReadOptions& options) { +StreamDecoder::StreamDecoder(std::shared_ptr listener, IpcReadOptions options) { impl_.reset(new StreamDecoderImpl(std::move(listener), options)); } @@ -1269,6 +1279,8 @@ std::shared_ptr StreamDecoder::schema() const { return impl_->schema(); int64_t StreamDecoder::next_required_size() const { return impl_->next_required_size(); } +ReadStats StreamDecoder::stats() const { return impl_->stats(); } + Result> ReadSchema(io::InputStream* stream, DictionaryMemo* dictionary_memo) { std::unique_ptr reader = MessageReader::Open(stream); diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 8a663879a40..31ceb0c31aa 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -267,7 +267,7 @@ class ARROW_EXPORT StreamDecoder { /// Listener::OnRecordBatchDecoded() to receive decoded record batches /// \param[in] options any IPC reading options (optional) StreamDecoder(std::shared_ptr listener, - const IpcReadOptions& options = IpcReadOptions::Defaults()); + IpcReadOptions options = IpcReadOptions::Defaults()); virtual ~StreamDecoder(); @@ -360,6 +360,9 @@ class ARROW_EXPORT StreamDecoder { /// decoder int64_t next_required_size() const; + /// \brief Return current read statistics + ReadStats stats() const; + private: class StreamDecoderImpl; std::unique_ptr impl_; diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 624edec0e2f..0e9e8b1690f 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -69,6 +69,18 @@ using internal::kArrowMagicBytes; namespace { +bool HasNestedDict(const ArrayData& data) { + if (data.type->id() == Type::DICTIONARY) { + return true; + } + for (const auto& child : data.child_data) { + if (HasNestedDict(*child)) { + return true; + } + } + return false; +} + Status GetTruncatedBitmap(int64_t offset, int64_t length, const std::shared_ptr input, MemoryPool* pool, std::shared_ptr* buffer) { @@ -984,7 +996,9 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter { IpcPayload payload; RETURN_NOT_OK(GetRecordBatchPayload(batch, options_, &payload)); - return payload_writer_->WritePayload(payload); + RETURN_NOT_OK(WritePayload(payload)); + ++stats_.num_record_batches; + return Status::OK(); } Status Close() override { @@ -998,9 +1012,11 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter { IpcPayload payload; RETURN_NOT_OK(GetSchemaPayload(schema_, options_, mapper_, &payload)); - return payload_writer_->WritePayload(payload); + return WritePayload(payload); } + WriteStats stats() const override { return stats_; } + protected: Status CheckStarted() { if (!started_) { @@ -1011,39 +1027,66 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter { Status WriteDictionaries(const RecordBatch& batch) { ARROW_ASSIGN_OR_RAISE(const auto dictionaries, CollectDictionaries(batch, mapper_)); + const auto equal_options = EqualOptions().nans_equal(true); for (const auto& pair : dictionaries) { - IpcPayload payload; int64_t dictionary_id = pair.first; const auto& dictionary = pair.second; // If a dictionary with this id was already emitted, check if it was the same. auto* last_dictionary = &last_dictionaries_[dictionary_id]; const bool dictionary_exists = (*last_dictionary != nullptr); + int64_t delta_start = 0; if (dictionary_exists) { if ((*last_dictionary)->data() == dictionary->data()) { // Fast shortcut for a common case. // Same dictionary data by pointer => no need to emit it again continue; } - if ((*last_dictionary)->Equals(dictionary, EqualOptions().nans_equal(true))) { + const int64_t last_length = (*last_dictionary)->length(); + const int64_t new_length = dictionary->length(); + if (new_length == last_length && + ((*last_dictionary)->Equals(dictionary, equal_options))) { // Same dictionary by value => no need to emit it again // (while this can have a CPU cost, this code path is required // for the IPC file format) continue; } - // TODO check for possible delta? - } + if (is_file_format_) { + return Status::Invalid( + "Dictionary replacement detected when writing IPC file format. " + "Arrow IPC files only support a single dictionary for a given field " + "across all batches."); + } - if (is_file_format_ && dictionary_exists) { - return Status::Invalid( - "Dictionary replacement detected when writing IPC file format. " - "Arrow IPC files only support a single dictionary for a given field " - "across all batches."); + // (the read path doesn't support outer dictionary deltas, don't emit them) + if (new_length > last_length && options_.emit_dictionary_deltas && + !HasNestedDict(*dictionary->data()) && + ((*last_dictionary) + ->RangeEquals(dictionary, 0, last_length, 0, equal_options))) { + // New dictionary starts with the current dictionary + delta_start = last_length; + } } - RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, dictionary, options_, &payload)); - RETURN_NOT_OK(payload_writer_->WritePayload(payload)); + IpcPayload payload; + if (delta_start) { + RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, /*is_delta=*/true, + dictionary->Slice(delta_start), options_, + &payload)); + } else { + RETURN_NOT_OK( + GetDictionaryPayload(dictionary_id, dictionary, options_, &payload)); + } + RETURN_NOT_OK(WritePayload(payload)); + ++stats_.num_dictionary_batches; + if (dictionary_exists) { + if (delta_start) { + ++stats_.num_dictionary_deltas; + } else { + ++stats_.num_replaced_dictionaries; + } + } // Remember dictionary for next batches *last_dictionary = dictionary; @@ -1051,6 +1094,12 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter { return Status::OK(); } + Status WritePayload(const IpcPayload& payload) { + RETURN_NOT_OK(payload_writer_->WritePayload(payload)); + ++stats_.num_messages; + return Status::OK(); + } + std::unique_ptr payload_writer_; std::shared_ptr shared_schema_; const Schema& schema_; @@ -1066,6 +1115,7 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter { bool started_ = false; IpcWriteOptions options_; + WriteStats stats_; }; class StreamBookKeeper { diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index 48f3a9a3b58..704dcc8646a 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -60,6 +60,23 @@ struct IpcPayload { int64_t body_length = 0; }; +struct WriteStats { + /// Number of IPC messages written. + int64_t num_messages = 0; + /// Number of record batches written. + int64_t num_record_batches = 0; + /// Number of dictionary batches written. + /// + /// Note: num_dictionary_batches >= num_dictionary_deltas + num_replaced_dictionaries + int64_t num_dictionary_batches = 0; + + /// Number of dictionary deltas written. + int64_t num_dictionary_deltas = 0; + /// Number of replaced dictionaries (i.e. where a dictionary batch replaces + /// an existing dictionary with an unrelated new dictionary). + int64_t num_replaced_dictionaries = 0; +}; + /// \class RecordBatchWriter /// \brief Abstract interface for writing a stream of record batches class ARROW_EXPORT RecordBatchWriter { @@ -87,6 +104,9 @@ class ARROW_EXPORT RecordBatchWriter { /// /// \return Status virtual Status Close() = 0; + + /// \brief Return current write statistics + virtual WriteStats stats() const = 0; }; /// Create a new IPC stream writer from stream sink and schema. User is diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 4d49532715f..45a39061919 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1359,6 +1359,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: CMetadataVersion metadata_version shared_ptr[CCodec] codec c_bool use_threads + c_bool emit_dictionary_deltas @staticmethod CIpcWriteOptions Defaults() @@ -1371,6 +1372,20 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: @staticmethod CIpcReadOptions Defaults() + cdef cppclass CIpcWriteStats" arrow::ipc::WriteStats": + int64_t num_messages + int64_t num_record_batches + int64_t num_dictionary_batches + int64_t num_dictionary_deltas + int64_t num_replaced_dictionaries + + cdef cppclass CIpcReadStats" arrow::ipc::ReadStats": + int64_t num_messages + int64_t num_record_batches + int64_t num_dictionary_batches + int64_t num_dictionary_deltas + int64_t num_replaced_dictionaries + cdef cppclass CDictionaryMemo" arrow::ipc::DictionaryMemo": pass @@ -1409,6 +1424,8 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: CStatus WriteRecordBatch(const CRecordBatch& batch) CStatus WriteTable(const CTable& table, int64_t max_chunksize) + CIpcWriteStats stats() + cdef cppclass CRecordBatchStreamReader \ " arrow::ipc::RecordBatchStreamReader"(CRecordBatchReader): @staticmethod @@ -1420,13 +1437,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: unique_ptr[CMessageReader] message_reader, const CIpcReadOptions& options) - CResult[shared_ptr[CRecordBatchWriter]] MakeStreamWriter( - shared_ptr[COutputStream] sink, const shared_ptr[CSchema]& schema, - CIpcWriteOptions& options) - - CResult[shared_ptr[CRecordBatchWriter]] MakeFileWriter( - shared_ptr[COutputStream] sink, const shared_ptr[CSchema]& schema, - CIpcWriteOptions& options) + CIpcReadStats stats() cdef cppclass CRecordBatchFileReader \ " arrow::ipc::RecordBatchFileReader": @@ -1446,6 +1457,16 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: CResult[shared_ptr[CRecordBatch]] ReadRecordBatch(int i) + CIpcReadStats stats() + + CResult[shared_ptr[CRecordBatchWriter]] MakeStreamWriter( + shared_ptr[COutputStream] sink, const shared_ptr[CSchema]& schema, + CIpcWriteOptions& options) + + CResult[shared_ptr[CRecordBatchWriter]] MakeFileWriter( + shared_ptr[COutputStream] sink, const shared_ptr[CSchema]& schema, + CIpcWriteOptions& options) + CResult[unique_ptr[CMessage]] ReadMessage(CInputStream* stream, CMemoryPool* pool) diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index b1f7a9e51eb..7d3f7004816 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +from collections import namedtuple import warnings @@ -45,6 +46,44 @@ cdef CMetadataVersion _unwrap_metadata_version( raise ValueError("Not a metadata version: " + repr(version)) +_WriteStats = namedtuple( + 'WriteStats', + ('num_messages', 'num_record_batches', 'num_dictionary_batches', + 'num_dictionary_deltas', 'num_replaced_dictionaries')) + + +class WriteStats(_WriteStats): + """IPC write statistics + """ + __slots__ = () + + +@staticmethod +cdef _wrap_write_stats(CIpcWriteStats c): + return WriteStats(c.num_messages, c.num_record_batches, + c.num_dictionary_batches, c.num_dictionary_deltas, + c.num_replaced_dictionaries) + + +_ReadStats = namedtuple( + 'ReadStats', + ('num_messages', 'num_record_batches', 'num_dictionary_batches', + 'num_dictionary_deltas', 'num_replaced_dictionaries')) + + +class ReadStats(_ReadStats): + """IPC write statistics + """ + __slots__ = () + + +@staticmethod +cdef _wrap_read_stats(CIpcReadStats c): + return ReadStats(c.num_messages, c.num_record_batches, + c.num_dictionary_batches, c.num_dictionary_deltas, + c.num_replaced_dictionaries) + + cdef class IpcWriteOptions(_Weakrefable): """Serialization options for the IPC format. @@ -61,6 +100,9 @@ cdef class IpcWriteOptions(_Weakrefable): use_threads: bool Whether to use the global CPU thread pool to parallelize any computational tasks like compression. + emit_dictionary_deltas: bool + Whether to emit dictionary deltas. Default is false for maximum + stream compatibility. """ __slots__ = () @@ -68,13 +110,14 @@ cdef class IpcWriteOptions(_Weakrefable): def __init__(self, *, metadata_version=MetadataVersion.V5, use_legacy_format=False, compression=None, - bint use_threads=True): + bint use_threads=True, bint emit_dictionary_deltas=False): self.c_options = CIpcWriteOptions.Defaults() self.use_legacy_format = use_legacy_format self.metadata_version = metadata_version if compression is not None: self.compression = compression self.use_threads = use_threads + self.emit_dictionary_deltas = emit_dictionary_deltas @property def use_legacy_format(self): @@ -115,6 +158,14 @@ cdef class IpcWriteOptions(_Weakrefable): def use_threads(self, bint value): self.c_options.use_threads = value + @property + def emit_dictionary_deltas(self): + return self.c_options.emit_dictionary_deltas + + @emit_dictionary_deltas.setter + def emit_dictionary_deltas(self, bint value): + self.c_options.emit_dictionary_deltas = value + cdef class Message(_Weakrefable): """ @@ -356,6 +407,15 @@ cdef class _CRecordBatchWriter(_Weakrefable): def __exit__(self, exc_type, exc_val, exc_tb): self.close() + @property + def stats(self): + """ + Current IPC write statistics. + """ + if not self.writer: + raise ValueError("Operation on closed writer") + return _wrap_write_stats(self.writer.get().stats()) + cdef class _RecordBatchStreamWriter(_CRecordBatchWriter): cdef: @@ -568,6 +628,7 @@ cdef class _RecordBatchStreamReader(RecordBatchReader): cdef: shared_ptr[CInputStream] in_stream CIpcReadOptions options + CRecordBatchStreamReader* stream_reader def __cinit__(self): pass @@ -577,6 +638,16 @@ cdef class _RecordBatchStreamReader(RecordBatchReader): with nogil: self.reader = GetResultValue(CRecordBatchStreamReader.Open( self.in_stream, self.options)) + self.stream_reader = self.reader.get() + + @property + def stats(self): + """ + Current IPC read statistics. + """ + if not self.reader: + raise ValueError("Operation on closed reader") + return _wrap_read_stats(self.stream_reader.stats()) cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter): @@ -678,6 +749,15 @@ cdef class _RecordBatchFileReader(_Weakrefable): def __exit__(self, exc_type, exc_value, traceback): pass + @property + def stats(self): + """ + Current IPC read statistics. + """ + if not self.reader: + raise ValueError("Operation on closed reader") + return _wrap_read_stats(self.reader.get().stats()) + def get_tensor_size(Tensor tensor): """ diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index 65325c483c4..049d0c95c4b 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -21,7 +21,8 @@ import pyarrow as pa -from pyarrow.lib import (IpcWriteOptions, Message, MessageReader, # noqa +from pyarrow.lib import (IpcWriteOptions, ReadStats, WriteStats, # noqa + Message, MessageReader, RecordBatchReader, _ReadPandasMixin, MetadataVersion, read_message, read_record_batch, read_schema, diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index 2306183b0b9..3132ddc94ef 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -37,6 +37,7 @@ class IpcFixture: + write_stats = None def __init__(self, sink_factory=lambda: io.BytesIO()): self._sink_factory = sink_factory @@ -69,6 +70,7 @@ def write_batches(self, num_batches=5, as_table=False): for batch in batches: writer.write_batch(batch) + self.write_stats = writer.stats writer.close() return batches @@ -92,6 +94,10 @@ def _check_roundtrip(self, as_table=False): assert batches[i].equals(batch) assert reader.schema.equals(batches[0].schema) + assert isinstance(reader.stats, pa.ipc.ReadStats) + assert isinstance(self.write_stats, pa.ipc.WriteStats) + assert tuple(reader.stats) == tuple(self.write_stats) + class StreamFormatFixture(IpcFixture): @@ -177,6 +183,12 @@ def test_open_file_from_buffer(file_fixture): assert result1.equals(result2) assert result1.equals(result3) + st1 = reader1.stats + assert st1.num_messages == 6 + assert st1.num_record_batches == 5 + assert reader2.stats == st1 + assert reader3.stats == st1 + @pytest.mark.pandas def test_file_read_pandas(file_fixture): @@ -243,6 +255,14 @@ def test_open_stream_from_buffer(stream_fixture): assert result1.equals(result2) assert result1.equals(result3) + st1 = reader1.stats + assert st1.num_messages == 6 + assert st1.num_record_batches == 5 + assert reader2.stats == st1 + assert reader3.stats == st1 + + assert tuple(st1) == tuple(stream_fixture.write_stats) + @pytest.mark.pandas def test_stream_write_dispatch(stream_fixture): @@ -387,6 +407,42 @@ def test_stream_options_roundtrip(stream_fixture, options): reader.read_next_batch() +def test_dictionary_delta(stream_fixture): + ty = pa.dictionary(pa.int8(), pa.utf8()) + data = [["foo", "foo", None], + ["foo", "bar", "foo"], # potential delta + ["foo", "bar"], + ["foo", None, "bar", "quux"], # potential delta + ["bar", "quux"], # replacement + ] + batches = [ + pa.RecordBatch.from_arrays([pa.array(v, type=ty)], names=['dicts']) + for v in data] + schema = batches[0].schema + + def write_batches(): + with stream_fixture._get_writer(pa.MockOutputStream(), + schema) as writer: + for batch in batches: + writer.write_batch(batch) + return writer.stats + + st = write_batches() + assert st.num_record_batches == 5 + assert st.num_dictionary_batches == 4 + assert st.num_replaced_dictionaries == 3 + assert st.num_dictionary_deltas == 0 + + stream_fixture.use_legacy_ipc_format = None + stream_fixture.options = pa.ipc.IpcWriteOptions( + emit_dictionary_deltas=True) + st = write_batches() + assert st.num_record_batches == 5 + assert st.num_dictionary_batches == 4 + assert st.num_replaced_dictionaries == 1 + assert st.num_dictionary_deltas == 2 + + def test_envvar_set_legacy_ipc_format(): schema = pa.schema([pa.field('foo', pa.int32())]) From be9d9215ce8c0f6b2ee18f18536f879a5778536b Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 10 Dec 2020 12:43:56 +0100 Subject: [PATCH 2/2] Update feature matrix --- docs/source/status.rst | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/docs/source/status.rst b/docs/source/status.rst index 06054d2e501..e4fe8622ce3 100644 --- a/docs/source/status.rst +++ b/docs/source/status.rst @@ -118,7 +118,7 @@ IPC Format +-----------------------------+-------+-------+-------+------------+-------+-------+ | Replacement dictionaries | ✓ | ✓ | | | | | +-----------------------------+-------+-------+-------+------------+-------+-------+ -| Delta dictionaries | | | | | | | +| Delta dictionaries | ✓ (1) | | | | | | +-----------------------------+-------+-------+-------+------------+-------+-------+ | Tensors | ✓ | | | | | | +-----------------------------+-------+-------+-------+------------+-------+-------+ @@ -127,6 +127,13 @@ IPC Format | Custom schema metadata | ✓ | ✓ | | | | | +-----------------------------+-------+-------+-------+------------+-------+-------+ +Notes: + +* \(1) Delta dictionaries not supported on nested dictionaries + +.. seealso:: + The :ref:`format-ipc` specification. + Flight RPC ========== @@ -177,6 +184,22 @@ C Data Interface The :ref:`C Data Interface ` specification. +C Stream Interface (experimental) +================================= + ++-----------------------------+-------+--------+ +| Feature | C++ | Python | +| | | | ++=============================+=======+========+ +| Stream export | ✓ | ✓ | ++-----------------------------+-------+--------+ +| Stream import | ✓ | ✓ | ++-----------------------------+-------+--------+ + +.. seealso:: + The :ref:`C Stream Interface ` specification. + + Third-Party Data Formats ========================