diff --git a/cpp/src/arrow/array/array_base.cc b/cpp/src/arrow/array/array_base.cc index 265f3d91372..67c5ca84e1f 100644 --- a/cpp/src/arrow/array/array_base.cc +++ b/cpp/src/arrow/array/array_base.cc @@ -222,29 +222,31 @@ bool Array::ApproxEquals(const std::shared_ptr& arr, } bool Array::RangeEquals(const Array& other, int64_t start_idx, int64_t end_idx, - int64_t other_start_idx) const { - return ArrayRangeEquals(*this, other, start_idx, end_idx, other_start_idx); + int64_t other_start_idx, const EqualOptions& opts) const { + return ArrayRangeEquals(*this, other, start_idx, end_idx, other_start_idx, opts); } bool Array::RangeEquals(const std::shared_ptr& other, int64_t start_idx, - int64_t end_idx, int64_t other_start_idx) const { + int64_t end_idx, int64_t other_start_idx, + const EqualOptions& opts) const { if (!other) { return false; } - return ArrayRangeEquals(*this, *other, start_idx, end_idx, other_start_idx); + return ArrayRangeEquals(*this, *other, start_idx, end_idx, other_start_idx, opts); } bool Array::RangeEquals(int64_t start_idx, int64_t end_idx, int64_t other_start_idx, - const Array& other) const { - return ArrayRangeEquals(*this, other, start_idx, end_idx, other_start_idx); + const Array& other, const EqualOptions& opts) const { + return ArrayRangeEquals(*this, other, start_idx, end_idx, other_start_idx, opts); } bool Array::RangeEquals(int64_t start_idx, int64_t end_idx, int64_t other_start_idx, - const std::shared_ptr& other) const { + const std::shared_ptr& other, + const EqualOptions& opts) const { if (!other) { return false; } - return ArrayRangeEquals(*this, *other, start_idx, end_idx, other_start_idx); + return ArrayRangeEquals(*this, *other, start_idx, end_idx, other_start_idx, opts); } std::shared_ptr Array::Slice(int64_t offset, int64_t length) const { diff --git a/cpp/src/arrow/array/array_base.h b/cpp/src/arrow/array/array_base.h index ef0b1b626e3..9bcd1621840 100644 --- a/cpp/src/arrow/array/array_base.h +++ b/cpp/src/arrow/array/array_base.h @@ -119,13 +119,17 @@ class ARROW_EXPORT Array { /// Compare if the range of slots specified are equal for the given array and /// this array. end_idx exclusive. This methods does not bounds check. bool RangeEquals(int64_t start_idx, int64_t end_idx, int64_t other_start_idx, - const Array& other) const; + const Array& other, + const EqualOptions& = EqualOptions::Defaults()) const; bool RangeEquals(int64_t start_idx, int64_t end_idx, int64_t other_start_idx, - const std::shared_ptr& other) const; + const std::shared_ptr& other, + const EqualOptions& = EqualOptions::Defaults()) const; bool RangeEquals(const Array& other, int64_t start_idx, int64_t end_idx, - int64_t other_start_idx) const; + int64_t other_start_idx, + const EqualOptions& = EqualOptions::Defaults()) const; bool RangeEquals(const std::shared_ptr& other, int64_t start_idx, - int64_t end_idx, int64_t other_start_idx) const; + int64_t end_idx, int64_t other_start_idx, + const EqualOptions& = EqualOptions::Defaults()) const; Status Accept(ArrayVisitor* visitor) const; diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc index 5c56e6409a7..f42fbdaa9cf 100644 --- a/cpp/src/arrow/flight/client.cc +++ b/cpp/src/arrow/flight/client.cc @@ -664,12 +664,14 @@ class GrpcStreamWriter : public FlightStreamWriter { } return Status::OK(); } + Status WriteWithMetadata(const RecordBatch& batch, std::shared_ptr app_metadata) override { RETURN_NOT_OK(CheckStarted()); app_metadata_ = app_metadata; return batch_writer_->WriteRecordBatch(batch); } + Status DoneWriting() override { // Do not CheckStarted - DoneWriting applies to data and metadata if (batch_writer_) { @@ -683,6 +685,7 @@ class GrpcStreamWriter : public FlightStreamWriter { } return writer_->DoneWriting(); } + Status Close() override { // Do not CheckStarted - Close applies to data and metadata if (batch_writer_ && !writer_closed_) { @@ -698,6 +701,11 @@ class GrpcStreamWriter : public FlightStreamWriter { return writer_->Finish(Status::OK()); } + ipc::WriteStats stats() const override { + ARROW_CHECK_NE(batch_writer_, nullptr); + return batch_writer_->stats(); + } + private: friend class DoPutPayloadWriter; std::shared_ptr app_metadata_; diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc index 87c96ce4910..4e35950c871 100644 --- a/cpp/src/arrow/flight/server.cc +++ b/cpp/src/arrow/flight/server.cc @@ -314,7 +314,9 @@ class DoExchangeMessageWriter : public FlightMessageWriter { payload.app_metadata = app_metadata; } RETURN_NOT_OK(ipc::GetRecordBatchPayload(batch, ipc_options_, &payload.ipc_message)); - return WritePayload(payload); + RETURN_NOT_OK(WritePayload(payload)); + ++stats_.num_record_batches; + return Status::OK(); } Status Close() override { @@ -322,12 +324,15 @@ class DoExchangeMessageWriter : public FlightMessageWriter { return Status::OK(); } + ipc::WriteStats stats() const override { return stats_; } + private: Status WritePayload(const FlightPayload& payload) { if (!internal::WritePayload(payload, stream_)) { // gRPC doesn't give us any way to find what the error was (if any). return Status::IOError("Could not write payload to stream"); } + ++stats_.num_messages; return Status::OK(); } @@ -350,6 +355,7 @@ class DoExchangeMessageWriter : public FlightMessageWriter { RETURN_NOT_OK(ipc::GetDictionaryPayload(pair.first, pair.second, ipc_options_, &payload.ipc_message)); RETURN_NOT_OK(WritePayload(payload)); + ++stats_.num_dictionary_batches; } return Status::OK(); } @@ -357,6 +363,7 @@ class DoExchangeMessageWriter : public FlightMessageWriter { grpc::ServerReaderWriter* stream_; ::arrow::ipc::IpcWriteOptions ipc_options_; ipc::DictionaryFieldMapper mapper_; + ipc::WriteStats stats_; bool started_ = false; bool dictionaries_written_ = false; }; diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h index bf535cdacf3..83a3c2d0330 100644 --- a/cpp/src/arrow/ipc/options.h +++ b/cpp/src/arrow/ipc/options.h @@ -65,6 +65,20 @@ struct ARROW_EXPORT IpcWriteOptions { /// like compression bool use_threads = true; + /// \brief Whether to emit dictionary deltas + /// + /// If false, a changed dictionary for a given field will emit a full + /// dictionary replacement. + /// If true, a changed dictionary will be compared against the previous + /// version. If possible, a dictionary delta will be omitted, otherwise + /// a full dictionary replacement. + /// + /// Default is false to maximize stream compatibility. + /// + /// Also, note that if a changed dictionary is a nested dictionary, + /// then a delta is never emitted, for compatibility with the read path. + bool emit_dictionary_deltas = false; + /// \brief Format version to use for IPC messages and their metadata. /// /// Presently using V5 version (readable by 1.0.0 and later). diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 2e1e6ad615f..3180a6880b2 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -929,8 +929,11 @@ struct FileWriterHelper { return Status::OK(); } - Status Finish() { + Status Finish(WriteStats* out_stats = nullptr) { RETURN_NOT_OK(writer_->Close()); + if (out_stats) { + *out_stats = writer_->stats(); + } RETURN_NOT_OK(sink_->Close()); // Current offset into stream is the end of the file return sink_->Tell().Value(&footer_offset_); @@ -997,8 +1000,11 @@ struct StreamWriterHelper { return Status::OK(); } - Status Finish() { + Status Finish(WriteStats* out_stats = nullptr) { RETURN_NOT_OK(writer_->Close()); + if (out_stats) { + *out_stats = writer_->stats(); + } return sink_->Close(); } @@ -1034,13 +1040,13 @@ struct StreamWriterHelper { struct StreamDecoderWriterHelper : public StreamWriterHelper { Status ReadBatches(const IpcReadOptions& options, BatchVector* out_batches, ReadStats* out_stats = nullptr) override { - if (out_stats) { - return Status::NotImplemented("StreamDecoder does not support stats()"); - } auto listener = std::make_shared(); StreamDecoder decoder(listener, options); RETURN_NOT_OK(DoConsume(&decoder)); *out_batches = listener->record_batches(); + if (out_stats) { + *out_stats = decoder.stats(); + } return Status::OK(); } @@ -1763,6 +1769,40 @@ class TestDictionaryReplacement : public ::testing::Test { EXPECT_EQ(read_stats_.num_dictionary_deltas, 0); } + void TestDeltaDict() { + auto type = dictionary(int8(), utf8()); + auto batch1 = MakeBatch(ArrayFromJSON(type, R"(["foo", "foo", "bar", null])")); + // Potential delta + auto batch2 = MakeBatch(ArrayFromJSON(type, R"(["foo", "bar", "quux", "foo"])")); + // Potential delta + auto batch3 = + MakeBatch(ArrayFromJSON(type, R"(["foo", "bar", "quux", "zzz", "foo"])")); + auto batch4 = MakeBatch(ArrayFromJSON(type, R"(["bar", null, "quux", "foo"])")); + BatchVector batches{batch1, batch2, batch3, batch4}; + if (WriterHelper::kIsFileFormat) { + CheckWritingFails(batches, 1); + } else { + CheckRoundtrip(batches); + EXPECT_EQ(read_stats_.num_messages, 9); // including schema message + EXPECT_EQ(read_stats_.num_record_batches, 4); + EXPECT_EQ(read_stats_.num_dictionary_batches, 4); + EXPECT_EQ(read_stats_.num_replaced_dictionaries, 3); + EXPECT_EQ(read_stats_.num_dictionary_deltas, 0); + } + + write_options_.emit_dictionary_deltas = true; + if (WriterHelper::kIsFileFormat) { + CheckWritingFails(batches, 1); + } else { + CheckRoundtrip(batches); + EXPECT_EQ(read_stats_.num_messages, 9); // including schema message + EXPECT_EQ(read_stats_.num_record_batches, 4); + EXPECT_EQ(read_stats_.num_dictionary_batches, 4); + EXPECT_EQ(read_stats_.num_replaced_dictionaries, 1); + EXPECT_EQ(read_stats_.num_dictionary_deltas, 2); + } + } + void TestSameDictValuesNested() { CheckRoundtrip(SameValuesNestedDictBatches()); @@ -1821,13 +1861,96 @@ class TestDictionaryReplacement : public ::testing::Test { EXPECT_EQ(read_stats_.num_dictionary_deltas, 0); } + void TestDeltaDictNestedOuter() { + // Outer dict changes, inner dict remains the same + auto value_type = list(dictionary(int8(), utf8())); + auto type = dictionary(int8(), value_type); + // Inner dict: ["a", "b"] + auto batch1_values = ArrayFromJSON(value_type, R"([["a"], ["b"]])"); + // Potential delta + auto batch2_values = ArrayFromJSON(value_type, R"([["a"], ["b"], ["a", "a"]])"); + auto batch1 = MakeBatch(type, ArrayFromJSON(int8(), "[1, 0, 1]"), batch1_values); + auto batch2 = + MakeBatch(type, ArrayFromJSON(int8(), "[2, null, 0, 0]"), batch2_values); + BatchVector batches{batch1, batch2}; + + if (WriterHelper::kIsFileFormat) { + CheckWritingFails(batches, 1); + } else { + CheckRoundtrip(batches); + EXPECT_EQ(read_stats_.num_messages, 6); // including schema message + EXPECT_EQ(read_stats_.num_record_batches, 2); + EXPECT_EQ(read_stats_.num_dictionary_batches, 3); + EXPECT_EQ(read_stats_.num_replaced_dictionaries, 1); + EXPECT_EQ(read_stats_.num_dictionary_deltas, 0); + } + + write_options_.emit_dictionary_deltas = true; + if (WriterHelper::kIsFileFormat) { + CheckWritingFails(batches, 1); + } else { + // Outer dict deltas are not emitted as the read path doesn't support them + CheckRoundtrip(batches); + EXPECT_EQ(read_stats_.num_messages, 6); // including schema message + EXPECT_EQ(read_stats_.num_record_batches, 2); + EXPECT_EQ(read_stats_.num_dictionary_batches, 3); + EXPECT_EQ(read_stats_.num_replaced_dictionaries, 1); + EXPECT_EQ(read_stats_.num_dictionary_deltas, 0); + } + } + + void TestDeltaDictNestedInner() { + // Inner dict changes + auto value_type = list(dictionary(int8(), utf8())); + auto type = dictionary(int8(), value_type); + // Inner dict: ["a"] + auto batch1_values = ArrayFromJSON(value_type, R"([["a"]])"); + // Inner dict: ["a", "b"] => potential delta + auto batch2_values = ArrayFromJSON(value_type, R"([["a"], ["b"], ["a", "a"]])"); + // Inner dict: ["a", "b", "c"] => potential delta + auto batch3_values = ArrayFromJSON(value_type, R"([["a"], ["b"], ["c"]])"); + // Inner dict: ["a", "b", "c"] + auto batch4_values = ArrayFromJSON(value_type, R"([["a"], ["b", "c"]])"); + // Inner dict: ["a", "c", "b"] => replacement + auto batch5_values = ArrayFromJSON(value_type, R"([["a"], ["c"], ["b"]])"); + auto batch1 = MakeBatch(type, ArrayFromJSON(int8(), "[0, null, 0]"), batch1_values); + auto batch2 = MakeBatch(type, ArrayFromJSON(int8(), "[1, 0, 2]"), batch2_values); + auto batch3 = MakeBatch(type, ArrayFromJSON(int8(), "[1, 0, 2]"), batch3_values); + auto batch4 = MakeBatch(type, ArrayFromJSON(int8(), "[1, 0, null]"), batch4_values); + auto batch5 = MakeBatch(type, ArrayFromJSON(int8(), "[1, 0, 2]"), batch5_values); + BatchVector batches{batch1, batch2, batch3, batch4, batch5}; + + if (WriterHelper::kIsFileFormat) { + CheckWritingFails(batches, 1); + } else { + CheckRoundtrip(batches); + EXPECT_EQ(read_stats_.num_messages, 15); // including schema message + EXPECT_EQ(read_stats_.num_record_batches, 5); + EXPECT_EQ(read_stats_.num_dictionary_batches, 9); // 4 inner + 5 outer + EXPECT_EQ(read_stats_.num_replaced_dictionaries, 7); + EXPECT_EQ(read_stats_.num_dictionary_deltas, 0); + } + + write_options_.emit_dictionary_deltas = true; + if (WriterHelper::kIsFileFormat) { + CheckWritingFails(batches, 1); + } else { + CheckRoundtrip(batches); + EXPECT_EQ(read_stats_.num_messages, 15); // including schema message + EXPECT_EQ(read_stats_.num_record_batches, 5); + EXPECT_EQ(read_stats_.num_dictionary_batches, 9); // 4 inner + 5 outer + EXPECT_EQ(read_stats_.num_replaced_dictionaries, 5); + EXPECT_EQ(read_stats_.num_dictionary_deltas, 2); + } + } + Status RoundTrip(const BatchVector& in_batches, BatchVector* out_batches) { WriterHelper writer_helper; RETURN_NOT_OK(writer_helper.Init(in_batches[0]->schema(), write_options_)); for (const auto& batch : in_batches) { RETURN_NOT_OK(writer_helper.WriteBatch(batch)); } - RETURN_NOT_OK(writer_helper.Finish()); + RETURN_NOT_OK(writer_helper.Finish(&write_stats_)); RETURN_NOT_OK(writer_helper.ReadBatches(read_options_, out_batches, &read_stats_)); for (const auto& batch : *out_batches) { RETURN_NOT_OK(batch->ValidateFull()); @@ -1838,6 +1961,7 @@ class TestDictionaryReplacement : public ::testing::Test { void CheckRoundtrip(const BatchVector& in_batches) { BatchVector out_batches; ASSERT_OK(RoundTrip(in_batches, &out_batches)); + CheckStatsConsistent(); ASSERT_EQ(in_batches.size(), out_batches.size()); for (size_t i = 0; i < in_batches.size(); ++i) { AssertBatchesEqual(*in_batches[i], *out_batches[i]); @@ -1853,6 +1977,15 @@ class TestDictionaryReplacement : public ::testing::Test { ASSERT_RAISES(Invalid, writer_helper.WriteBatch(in_batches[fails_at_batch_num])); } + void CheckStatsConsistent() { + ASSERT_EQ(read_stats_.num_messages, write_stats_.num_messages); + ASSERT_EQ(read_stats_.num_record_batches, write_stats_.num_record_batches); + ASSERT_EQ(read_stats_.num_dictionary_batches, write_stats_.num_dictionary_batches); + ASSERT_EQ(read_stats_.num_replaced_dictionaries, + write_stats_.num_replaced_dictionaries); + ASSERT_EQ(read_stats_.num_dictionary_deltas, write_stats_.num_dictionary_deltas); + } + BatchVector DifferentOrderDictBatches() { // Create two separate dictionaries with different order auto type = dictionary(int8(), utf8()); @@ -1919,6 +2052,7 @@ class TestDictionaryReplacement : public ::testing::Test { protected: IpcWriteOptions write_options_ = IpcWriteOptions::Defaults(); IpcReadOptions read_options_ = IpcReadOptions::Defaults(); + WriteStats write_stats_; ReadStats read_stats_; }; @@ -1928,6 +2062,8 @@ TYPED_TEST_P(TestDictionaryReplacement, SameDictPointer) { this->TestSameDictPoi TYPED_TEST_P(TestDictionaryReplacement, SameDictValues) { this->TestSameDictValues(); } +TYPED_TEST_P(TestDictionaryReplacement, DeltaDict) { this->TestDeltaDict(); } + TYPED_TEST_P(TestDictionaryReplacement, SameDictValuesNested) { this->TestSameDictValuesNested(); } @@ -1940,12 +2076,22 @@ TYPED_TEST_P(TestDictionaryReplacement, DifferentDictValuesNested) { this->TestDifferentDictValuesNested(); } +TYPED_TEST_P(TestDictionaryReplacement, DeltaDictNestedOuter) { + this->TestDeltaDictNestedOuter(); +} + +TYPED_TEST_P(TestDictionaryReplacement, DeltaDictNestedInner) { + this->TestDeltaDictNestedInner(); +} + REGISTER_TYPED_TEST_SUITE_P(TestDictionaryReplacement, SameDictPointer, SameDictValues, - SameDictValuesNested, DifferentDictValues, - DifferentDictValuesNested); + DeltaDict, SameDictValuesNested, DifferentDictValues, + DifferentDictValuesNested, DeltaDictNestedOuter, + DeltaDictNestedInner); using DictionaryReplacementTestTypes = - ::testing::Types; + ::testing::Types; INSTANTIATE_TYPED_TEST_SUITE_P(TestDictionaryReplacement, TestDictionaryReplacement, DictionaryReplacementTestTypes); diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 92f2b70f294..3d855425c7a 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -1142,20 +1142,16 @@ class StreamDecoder::StreamDecoderImpl : public MessageDecoderListener { }; public: - explicit StreamDecoderImpl(std::shared_ptr listener, - const IpcReadOptions& options) - : MessageDecoderListener(), - listener_(std::move(listener)), - options_(options), + explicit StreamDecoderImpl(std::shared_ptr listener, IpcReadOptions options) + : listener_(std::move(listener)), + options_(std::move(options)), state_(State::SCHEMA), message_decoder_(std::shared_ptr(this, [](void*) {}), options_.memory_pool), - field_inclusion_mask_(), - n_required_dictionaries_(0), - dictionary_memo_(), - schema_() {} + n_required_dictionaries_(0) {} Status OnMessageDecoded(std::unique_ptr message) override { + ++stats_.num_messages; switch (state_) { case State::SCHEMA: ARROW_RETURN_NOT_OK(OnSchemaMessageDecoded(std::move(message))); @@ -1189,6 +1185,8 @@ class StreamDecoder::StreamDecoderImpl : public MessageDecoderListener { int64_t next_required_size() const { return message_decoder_.next_required_size(); } + ReadStats stats() const { return stats_; } + private: Status OnSchemaMessageDecoded(std::unique_ptr message) { RETURN_NOT_OK(UnpackSchemaMessage(*message, options_, &dictionary_memo_, &schema_, @@ -1229,30 +1227,42 @@ class StreamDecoder::StreamDecoderImpl : public MessageDecoderListener { auto batch, ReadRecordBatchInternal(*message->metadata(), schema_, field_inclusion_mask_, &dictionary_memo_, options_, reader.get())); + ++stats_.num_record_batches; return listener_->OnRecordBatchDecoded(std::move(batch)); } } // Read dictionary from dictionary batch Status ReadDictionary(const Message& message) { - // TODO accumulate and expose ReadStats - DictionaryKind unused_kind; - return ::arrow::ipc::ReadDictionary(message, &dictionary_memo_, options_, - &unused_kind); + DictionaryKind kind; + RETURN_NOT_OK( + ::arrow::ipc::ReadDictionary(message, &dictionary_memo_, options_, &kind)); + ++stats_.num_dictionary_batches; + switch (kind) { + case DictionaryKind::New: + break; + case DictionaryKind::Delta: + ++stats_.num_dictionary_deltas; + break; + case DictionaryKind::Replacement: + ++stats_.num_replaced_dictionaries; + break; + } + return Status::OK(); } std::shared_ptr listener_; - IpcReadOptions options_; + const IpcReadOptions options_; State state_; MessageDecoder message_decoder_; std::vector field_inclusion_mask_; int n_required_dictionaries_; DictionaryMemo dictionary_memo_; std::shared_ptr schema_, out_schema_; + ReadStats stats_; }; -StreamDecoder::StreamDecoder(std::shared_ptr listener, - const IpcReadOptions& options) { +StreamDecoder::StreamDecoder(std::shared_ptr listener, IpcReadOptions options) { impl_.reset(new StreamDecoderImpl(std::move(listener), options)); } @@ -1269,6 +1279,8 @@ std::shared_ptr StreamDecoder::schema() const { return impl_->schema(); int64_t StreamDecoder::next_required_size() const { return impl_->next_required_size(); } +ReadStats StreamDecoder::stats() const { return impl_->stats(); } + Result> ReadSchema(io::InputStream* stream, DictionaryMemo* dictionary_memo) { std::unique_ptr reader = MessageReader::Open(stream); diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 8a663879a40..31ceb0c31aa 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -267,7 +267,7 @@ class ARROW_EXPORT StreamDecoder { /// Listener::OnRecordBatchDecoded() to receive decoded record batches /// \param[in] options any IPC reading options (optional) StreamDecoder(std::shared_ptr listener, - const IpcReadOptions& options = IpcReadOptions::Defaults()); + IpcReadOptions options = IpcReadOptions::Defaults()); virtual ~StreamDecoder(); @@ -360,6 +360,9 @@ class ARROW_EXPORT StreamDecoder { /// decoder int64_t next_required_size() const; + /// \brief Return current read statistics + ReadStats stats() const; + private: class StreamDecoderImpl; std::unique_ptr impl_; diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 624edec0e2f..0e9e8b1690f 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -69,6 +69,18 @@ using internal::kArrowMagicBytes; namespace { +bool HasNestedDict(const ArrayData& data) { + if (data.type->id() == Type::DICTIONARY) { + return true; + } + for (const auto& child : data.child_data) { + if (HasNestedDict(*child)) { + return true; + } + } + return false; +} + Status GetTruncatedBitmap(int64_t offset, int64_t length, const std::shared_ptr input, MemoryPool* pool, std::shared_ptr* buffer) { @@ -984,7 +996,9 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter { IpcPayload payload; RETURN_NOT_OK(GetRecordBatchPayload(batch, options_, &payload)); - return payload_writer_->WritePayload(payload); + RETURN_NOT_OK(WritePayload(payload)); + ++stats_.num_record_batches; + return Status::OK(); } Status Close() override { @@ -998,9 +1012,11 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter { IpcPayload payload; RETURN_NOT_OK(GetSchemaPayload(schema_, options_, mapper_, &payload)); - return payload_writer_->WritePayload(payload); + return WritePayload(payload); } + WriteStats stats() const override { return stats_; } + protected: Status CheckStarted() { if (!started_) { @@ -1011,39 +1027,66 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter { Status WriteDictionaries(const RecordBatch& batch) { ARROW_ASSIGN_OR_RAISE(const auto dictionaries, CollectDictionaries(batch, mapper_)); + const auto equal_options = EqualOptions().nans_equal(true); for (const auto& pair : dictionaries) { - IpcPayload payload; int64_t dictionary_id = pair.first; const auto& dictionary = pair.second; // If a dictionary with this id was already emitted, check if it was the same. auto* last_dictionary = &last_dictionaries_[dictionary_id]; const bool dictionary_exists = (*last_dictionary != nullptr); + int64_t delta_start = 0; if (dictionary_exists) { if ((*last_dictionary)->data() == dictionary->data()) { // Fast shortcut for a common case. // Same dictionary data by pointer => no need to emit it again continue; } - if ((*last_dictionary)->Equals(dictionary, EqualOptions().nans_equal(true))) { + const int64_t last_length = (*last_dictionary)->length(); + const int64_t new_length = dictionary->length(); + if (new_length == last_length && + ((*last_dictionary)->Equals(dictionary, equal_options))) { // Same dictionary by value => no need to emit it again // (while this can have a CPU cost, this code path is required // for the IPC file format) continue; } - // TODO check for possible delta? - } + if (is_file_format_) { + return Status::Invalid( + "Dictionary replacement detected when writing IPC file format. " + "Arrow IPC files only support a single dictionary for a given field " + "across all batches."); + } - if (is_file_format_ && dictionary_exists) { - return Status::Invalid( - "Dictionary replacement detected when writing IPC file format. " - "Arrow IPC files only support a single dictionary for a given field " - "across all batches."); + // (the read path doesn't support outer dictionary deltas, don't emit them) + if (new_length > last_length && options_.emit_dictionary_deltas && + !HasNestedDict(*dictionary->data()) && + ((*last_dictionary) + ->RangeEquals(dictionary, 0, last_length, 0, equal_options))) { + // New dictionary starts with the current dictionary + delta_start = last_length; + } } - RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, dictionary, options_, &payload)); - RETURN_NOT_OK(payload_writer_->WritePayload(payload)); + IpcPayload payload; + if (delta_start) { + RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, /*is_delta=*/true, + dictionary->Slice(delta_start), options_, + &payload)); + } else { + RETURN_NOT_OK( + GetDictionaryPayload(dictionary_id, dictionary, options_, &payload)); + } + RETURN_NOT_OK(WritePayload(payload)); + ++stats_.num_dictionary_batches; + if (dictionary_exists) { + if (delta_start) { + ++stats_.num_dictionary_deltas; + } else { + ++stats_.num_replaced_dictionaries; + } + } // Remember dictionary for next batches *last_dictionary = dictionary; @@ -1051,6 +1094,12 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter { return Status::OK(); } + Status WritePayload(const IpcPayload& payload) { + RETURN_NOT_OK(payload_writer_->WritePayload(payload)); + ++stats_.num_messages; + return Status::OK(); + } + std::unique_ptr payload_writer_; std::shared_ptr shared_schema_; const Schema& schema_; @@ -1066,6 +1115,7 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter { bool started_ = false; IpcWriteOptions options_; + WriteStats stats_; }; class StreamBookKeeper { diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index 48f3a9a3b58..704dcc8646a 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -60,6 +60,23 @@ struct IpcPayload { int64_t body_length = 0; }; +struct WriteStats { + /// Number of IPC messages written. + int64_t num_messages = 0; + /// Number of record batches written. + int64_t num_record_batches = 0; + /// Number of dictionary batches written. + /// + /// Note: num_dictionary_batches >= num_dictionary_deltas + num_replaced_dictionaries + int64_t num_dictionary_batches = 0; + + /// Number of dictionary deltas written. + int64_t num_dictionary_deltas = 0; + /// Number of replaced dictionaries (i.e. where a dictionary batch replaces + /// an existing dictionary with an unrelated new dictionary). + int64_t num_replaced_dictionaries = 0; +}; + /// \class RecordBatchWriter /// \brief Abstract interface for writing a stream of record batches class ARROW_EXPORT RecordBatchWriter { @@ -87,6 +104,9 @@ class ARROW_EXPORT RecordBatchWriter { /// /// \return Status virtual Status Close() = 0; + + /// \brief Return current write statistics + virtual WriteStats stats() const = 0; }; /// Create a new IPC stream writer from stream sink and schema. User is diff --git a/docs/source/status.rst b/docs/source/status.rst index 06054d2e501..e4fe8622ce3 100644 --- a/docs/source/status.rst +++ b/docs/source/status.rst @@ -118,7 +118,7 @@ IPC Format +-----------------------------+-------+-------+-------+------------+-------+-------+ | Replacement dictionaries | ✓ | ✓ | | | | | +-----------------------------+-------+-------+-------+------------+-------+-------+ -| Delta dictionaries | | | | | | | +| Delta dictionaries | ✓ (1) | | | | | | +-----------------------------+-------+-------+-------+------------+-------+-------+ | Tensors | ✓ | | | | | | +-----------------------------+-------+-------+-------+------------+-------+-------+ @@ -127,6 +127,13 @@ IPC Format | Custom schema metadata | ✓ | ✓ | | | | | +-----------------------------+-------+-------+-------+------------+-------+-------+ +Notes: + +* \(1) Delta dictionaries not supported on nested dictionaries + +.. seealso:: + The :ref:`format-ipc` specification. + Flight RPC ========== @@ -177,6 +184,22 @@ C Data Interface The :ref:`C Data Interface ` specification. +C Stream Interface (experimental) +================================= + ++-----------------------------+-------+--------+ +| Feature | C++ | Python | +| | | | ++=============================+=======+========+ +| Stream export | ✓ | ✓ | ++-----------------------------+-------+--------+ +| Stream import | ✓ | ✓ | ++-----------------------------+-------+--------+ + +.. seealso:: + The :ref:`C Stream Interface ` specification. + + Third-Party Data Formats ======================== diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 4d49532715f..45a39061919 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1359,6 +1359,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: CMetadataVersion metadata_version shared_ptr[CCodec] codec c_bool use_threads + c_bool emit_dictionary_deltas @staticmethod CIpcWriteOptions Defaults() @@ -1371,6 +1372,20 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: @staticmethod CIpcReadOptions Defaults() + cdef cppclass CIpcWriteStats" arrow::ipc::WriteStats": + int64_t num_messages + int64_t num_record_batches + int64_t num_dictionary_batches + int64_t num_dictionary_deltas + int64_t num_replaced_dictionaries + + cdef cppclass CIpcReadStats" arrow::ipc::ReadStats": + int64_t num_messages + int64_t num_record_batches + int64_t num_dictionary_batches + int64_t num_dictionary_deltas + int64_t num_replaced_dictionaries + cdef cppclass CDictionaryMemo" arrow::ipc::DictionaryMemo": pass @@ -1409,6 +1424,8 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: CStatus WriteRecordBatch(const CRecordBatch& batch) CStatus WriteTable(const CTable& table, int64_t max_chunksize) + CIpcWriteStats stats() + cdef cppclass CRecordBatchStreamReader \ " arrow::ipc::RecordBatchStreamReader"(CRecordBatchReader): @staticmethod @@ -1420,13 +1437,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: unique_ptr[CMessageReader] message_reader, const CIpcReadOptions& options) - CResult[shared_ptr[CRecordBatchWriter]] MakeStreamWriter( - shared_ptr[COutputStream] sink, const shared_ptr[CSchema]& schema, - CIpcWriteOptions& options) - - CResult[shared_ptr[CRecordBatchWriter]] MakeFileWriter( - shared_ptr[COutputStream] sink, const shared_ptr[CSchema]& schema, - CIpcWriteOptions& options) + CIpcReadStats stats() cdef cppclass CRecordBatchFileReader \ " arrow::ipc::RecordBatchFileReader": @@ -1446,6 +1457,16 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: CResult[shared_ptr[CRecordBatch]] ReadRecordBatch(int i) + CIpcReadStats stats() + + CResult[shared_ptr[CRecordBatchWriter]] MakeStreamWriter( + shared_ptr[COutputStream] sink, const shared_ptr[CSchema]& schema, + CIpcWriteOptions& options) + + CResult[shared_ptr[CRecordBatchWriter]] MakeFileWriter( + shared_ptr[COutputStream] sink, const shared_ptr[CSchema]& schema, + CIpcWriteOptions& options) + CResult[unique_ptr[CMessage]] ReadMessage(CInputStream* stream, CMemoryPool* pool) diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index b1f7a9e51eb..7d3f7004816 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +from collections import namedtuple import warnings @@ -45,6 +46,44 @@ cdef CMetadataVersion _unwrap_metadata_version( raise ValueError("Not a metadata version: " + repr(version)) +_WriteStats = namedtuple( + 'WriteStats', + ('num_messages', 'num_record_batches', 'num_dictionary_batches', + 'num_dictionary_deltas', 'num_replaced_dictionaries')) + + +class WriteStats(_WriteStats): + """IPC write statistics + """ + __slots__ = () + + +@staticmethod +cdef _wrap_write_stats(CIpcWriteStats c): + return WriteStats(c.num_messages, c.num_record_batches, + c.num_dictionary_batches, c.num_dictionary_deltas, + c.num_replaced_dictionaries) + + +_ReadStats = namedtuple( + 'ReadStats', + ('num_messages', 'num_record_batches', 'num_dictionary_batches', + 'num_dictionary_deltas', 'num_replaced_dictionaries')) + + +class ReadStats(_ReadStats): + """IPC write statistics + """ + __slots__ = () + + +@staticmethod +cdef _wrap_read_stats(CIpcReadStats c): + return ReadStats(c.num_messages, c.num_record_batches, + c.num_dictionary_batches, c.num_dictionary_deltas, + c.num_replaced_dictionaries) + + cdef class IpcWriteOptions(_Weakrefable): """Serialization options for the IPC format. @@ -61,6 +100,9 @@ cdef class IpcWriteOptions(_Weakrefable): use_threads: bool Whether to use the global CPU thread pool to parallelize any computational tasks like compression. + emit_dictionary_deltas: bool + Whether to emit dictionary deltas. Default is false for maximum + stream compatibility. """ __slots__ = () @@ -68,13 +110,14 @@ cdef class IpcWriteOptions(_Weakrefable): def __init__(self, *, metadata_version=MetadataVersion.V5, use_legacy_format=False, compression=None, - bint use_threads=True): + bint use_threads=True, bint emit_dictionary_deltas=False): self.c_options = CIpcWriteOptions.Defaults() self.use_legacy_format = use_legacy_format self.metadata_version = metadata_version if compression is not None: self.compression = compression self.use_threads = use_threads + self.emit_dictionary_deltas = emit_dictionary_deltas @property def use_legacy_format(self): @@ -115,6 +158,14 @@ cdef class IpcWriteOptions(_Weakrefable): def use_threads(self, bint value): self.c_options.use_threads = value + @property + def emit_dictionary_deltas(self): + return self.c_options.emit_dictionary_deltas + + @emit_dictionary_deltas.setter + def emit_dictionary_deltas(self, bint value): + self.c_options.emit_dictionary_deltas = value + cdef class Message(_Weakrefable): """ @@ -356,6 +407,15 @@ cdef class _CRecordBatchWriter(_Weakrefable): def __exit__(self, exc_type, exc_val, exc_tb): self.close() + @property + def stats(self): + """ + Current IPC write statistics. + """ + if not self.writer: + raise ValueError("Operation on closed writer") + return _wrap_write_stats(self.writer.get().stats()) + cdef class _RecordBatchStreamWriter(_CRecordBatchWriter): cdef: @@ -568,6 +628,7 @@ cdef class _RecordBatchStreamReader(RecordBatchReader): cdef: shared_ptr[CInputStream] in_stream CIpcReadOptions options + CRecordBatchStreamReader* stream_reader def __cinit__(self): pass @@ -577,6 +638,16 @@ cdef class _RecordBatchStreamReader(RecordBatchReader): with nogil: self.reader = GetResultValue(CRecordBatchStreamReader.Open( self.in_stream, self.options)) + self.stream_reader = self.reader.get() + + @property + def stats(self): + """ + Current IPC read statistics. + """ + if not self.reader: + raise ValueError("Operation on closed reader") + return _wrap_read_stats(self.stream_reader.stats()) cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter): @@ -678,6 +749,15 @@ cdef class _RecordBatchFileReader(_Weakrefable): def __exit__(self, exc_type, exc_value, traceback): pass + @property + def stats(self): + """ + Current IPC read statistics. + """ + if not self.reader: + raise ValueError("Operation on closed reader") + return _wrap_read_stats(self.reader.get().stats()) + def get_tensor_size(Tensor tensor): """ diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index 65325c483c4..049d0c95c4b 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -21,7 +21,8 @@ import pyarrow as pa -from pyarrow.lib import (IpcWriteOptions, Message, MessageReader, # noqa +from pyarrow.lib import (IpcWriteOptions, ReadStats, WriteStats, # noqa + Message, MessageReader, RecordBatchReader, _ReadPandasMixin, MetadataVersion, read_message, read_record_batch, read_schema, diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index 2306183b0b9..3132ddc94ef 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -37,6 +37,7 @@ class IpcFixture: + write_stats = None def __init__(self, sink_factory=lambda: io.BytesIO()): self._sink_factory = sink_factory @@ -69,6 +70,7 @@ def write_batches(self, num_batches=5, as_table=False): for batch in batches: writer.write_batch(batch) + self.write_stats = writer.stats writer.close() return batches @@ -92,6 +94,10 @@ def _check_roundtrip(self, as_table=False): assert batches[i].equals(batch) assert reader.schema.equals(batches[0].schema) + assert isinstance(reader.stats, pa.ipc.ReadStats) + assert isinstance(self.write_stats, pa.ipc.WriteStats) + assert tuple(reader.stats) == tuple(self.write_stats) + class StreamFormatFixture(IpcFixture): @@ -177,6 +183,12 @@ def test_open_file_from_buffer(file_fixture): assert result1.equals(result2) assert result1.equals(result3) + st1 = reader1.stats + assert st1.num_messages == 6 + assert st1.num_record_batches == 5 + assert reader2.stats == st1 + assert reader3.stats == st1 + @pytest.mark.pandas def test_file_read_pandas(file_fixture): @@ -243,6 +255,14 @@ def test_open_stream_from_buffer(stream_fixture): assert result1.equals(result2) assert result1.equals(result3) + st1 = reader1.stats + assert st1.num_messages == 6 + assert st1.num_record_batches == 5 + assert reader2.stats == st1 + assert reader3.stats == st1 + + assert tuple(st1) == tuple(stream_fixture.write_stats) + @pytest.mark.pandas def test_stream_write_dispatch(stream_fixture): @@ -387,6 +407,42 @@ def test_stream_options_roundtrip(stream_fixture, options): reader.read_next_batch() +def test_dictionary_delta(stream_fixture): + ty = pa.dictionary(pa.int8(), pa.utf8()) + data = [["foo", "foo", None], + ["foo", "bar", "foo"], # potential delta + ["foo", "bar"], + ["foo", None, "bar", "quux"], # potential delta + ["bar", "quux"], # replacement + ] + batches = [ + pa.RecordBatch.from_arrays([pa.array(v, type=ty)], names=['dicts']) + for v in data] + schema = batches[0].schema + + def write_batches(): + with stream_fixture._get_writer(pa.MockOutputStream(), + schema) as writer: + for batch in batches: + writer.write_batch(batch) + return writer.stats + + st = write_batches() + assert st.num_record_batches == 5 + assert st.num_dictionary_batches == 4 + assert st.num_replaced_dictionaries == 3 + assert st.num_dictionary_deltas == 0 + + stream_fixture.use_legacy_ipc_format = None + stream_fixture.options = pa.ipc.IpcWriteOptions( + emit_dictionary_deltas=True) + st = write_batches() + assert st.num_record_batches == 5 + assert st.num_dictionary_batches == 4 + assert st.num_replaced_dictionaries == 1 + assert st.num_dictionary_deltas == 2 + + def test_envvar_set_legacy_ipc_format(): schema = pa.schema([pa.field('foo', pa.int32())])