diff --git a/ci/scripts/integration_arrow.sh b/ci/scripts/integration_arrow.sh index d50a57fb32c..44e6670b1b0 100755 --- a/ci/scripts/integration_arrow.sh +++ b/ci/scripts/integration_arrow.sh @@ -23,7 +23,10 @@ arrow_dir=${1} source_dir=${1}/cpp build_dir=${2}/cpp gold_dir_0_14_1=$arrow_dir/testing/data/arrow-ipc-stream/integration/0.14.1 +gold_dir_0_17_1=$arrow_dir/testing/data/arrow-ipc-stream/integration/0.17.1 pip install -e $arrow_dir/dev/archery -archery integration --with-all --run-flight --gold-dirs=$gold_dir_0_14_1 +archery integration --with-all --run-flight \ + --gold-dirs=$gold_dir_0_14_1 \ + --gold-dirs=$gold_dir_0_17_1 \ diff --git a/cpp/src/arrow/ipc/file_to_stream.cc b/cpp/src/arrow/ipc/file_to_stream.cc index ba391346441..8570b6f3051 100644 --- a/cpp/src/arrow/ipc/file_to_stream.cc +++ b/cpp/src/arrow/ipc/file_to_stream.cc @@ -39,7 +39,8 @@ Status ConvertToStream(const char* path) { ARROW_ASSIGN_OR_RAISE(auto in_file, io::ReadableFile::Open(path)); ARROW_ASSIGN_OR_RAISE(auto reader, ipc::RecordBatchFileReader::Open(in_file.get())); - ARROW_ASSIGN_OR_RAISE(auto writer, ipc::NewStreamWriter(&sink, reader->schema())); + ARROW_ASSIGN_OR_RAISE(auto writer, ipc::NewStreamWriter(&sink, reader->schema(), + IpcWriteOptions::Defaults())); for (int i = 0; i < reader->num_record_batches(); ++i) { ARROW_ASSIGN_OR_RAISE(std::shared_ptr chunk, reader->ReadRecordBatch(i)); RETURN_NOT_OK(writer->WriteRecordBatch(*chunk)); diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc index d531cb04643..190fe7b7152 100644 --- a/cpp/src/arrow/ipc/metadata_internal.cc +++ b/cpp/src/arrow/ipc/metadata_internal.cc @@ -76,14 +76,42 @@ MetadataVersion GetMetadataVersion(flatbuf::MetadataVersion version) { // Arrow 0.3 to 0.7.1 return MetadataVersion::V4; case flatbuf::MetadataVersion::V4: - // Arrow >= 0.8 + // Arrow 0.8 to 0.17 return MetadataVersion::V4; + case flatbuf::MetadataVersion::V5: + // Arrow >= 1.0 + return MetadataVersion::V5; // Add cases as other versions become available default: - return MetadataVersion::V4; + return MetadataVersion::V5; } } +flatbuf::MetadataVersion MetadataVersionToFlatbuffer(MetadataVersion version) { + switch (version) { + case MetadataVersion::V1: + return flatbuf::MetadataVersion::V1; + case MetadataVersion::V2: + return flatbuf::MetadataVersion::V2; + case MetadataVersion::V3: + return flatbuf::MetadataVersion::V3; + case MetadataVersion::V4: + return flatbuf::MetadataVersion::V4; + case MetadataVersion::V5: + return flatbuf::MetadataVersion::V5; + // Add cases as other versions become available + default: + return flatbuf::MetadataVersion::V5; + } +} + +bool HasValidityBitmap(Type::type type_id, MetadataVersion version) { + // In V4, null types have no validity bitmap + // In V5 and later, null and union types have no validity bitmap + return (version < MetadataVersion::V5) ? (type_id != Type::NA) + : ::arrow::internal::HasValidityBitmap(type_id); +} + namespace { Status IntFromFlatbuffer(const flatbuf::Int* int_data, std::shared_ptr* out) { @@ -835,11 +863,11 @@ Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema, DictionaryMemo* dictio Result> WriteFBMessage( FBB& fbb, flatbuf::MessageHeader header_type, flatbuffers::Offset header, - int64_t body_length, + int64_t body_length, MetadataVersion version, const std::shared_ptr& custom_metadata = nullptr) { - auto message = - flatbuf::CreateMessage(fbb, kCurrentMetadataVersion, header_type, header, - body_length, SerializeCustomMetadata(fbb, custom_metadata)); + auto message = flatbuf::CreateMessage(fbb, MetadataVersionToFlatbuffer(version), + header_type, header, body_length, + SerializeCustomMetadata(fbb, custom_metadata)); fbb.Finish(message); return WriteFlatbufferBuilder(fbb); } @@ -1142,12 +1170,12 @@ Status GetKeyValueMetadata(const KVVector* fb_metadata, } Status WriteSchemaMessage(const Schema& schema, DictionaryMemo* dictionary_memo, - std::shared_ptr* out) { + const IpcWriteOptions& options, std::shared_ptr* out) { FBB fbb; flatbuffers::Offset fb_schema; RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema)); return WriteFBMessage(fbb, flatbuf::MessageHeader::Schema, fb_schema.Union(), - /*body_length=*/0) + /*body_length=*/0, options.metadata_version) .Value(out); } @@ -1161,12 +1189,13 @@ Status WriteRecordBatchMessage( RETURN_NOT_OK( MakeRecordBatch(fbb, length, body_length, nodes, buffers, options, &record_batch)); return WriteFBMessage(fbb, flatbuf::MessageHeader::RecordBatch, record_batch.Union(), - body_length, custom_metadata) + body_length, options.metadata_version, custom_metadata) .Value(out); } Result> WriteTensorMessage(const Tensor& tensor, - int64_t buffer_start_offset) { + int64_t buffer_start_offset, + const IpcWriteOptions& options) { using TensorDimOffset = flatbuffers::Offset; using TensorOffset = flatbuffers::Offset; @@ -1194,18 +1223,18 @@ Result> WriteTensorMessage(const Tensor& tensor, flatbuf::CreateTensor(fbb, fb_type_type, fb_type, fb_shape, fb_strides, &buffer); return WriteFBMessage(fbb, flatbuf::MessageHeader::Tensor, fb_tensor.Union(), - body_length); + body_length, options.metadata_version); } Result> WriteSparseTensorMessage( const SparseTensor& sparse_tensor, int64_t body_length, - const std::vector& buffers) { + const std::vector& buffers, const IpcWriteOptions& options) { FBB fbb; SparseTensorOffset fb_sparse_tensor; RETURN_NOT_OK( MakeSparseTensor(fbb, sparse_tensor, body_length, buffers, &fb_sparse_tensor)); return WriteFBMessage(fbb, flatbuf::MessageHeader::SparseTensor, - fb_sparse_tensor.Union(), body_length); + fb_sparse_tensor.Union(), body_length, options.metadata_version); } Status WriteDictionaryMessage( @@ -1220,7 +1249,7 @@ Status WriteDictionaryMessage( auto dictionary_batch = flatbuf::CreateDictionaryBatch(fbb, id, record_batch, is_delta).Union(); return WriteFBMessage(fbb, flatbuf::MessageHeader::DictionaryBatch, dictionary_batch, - body_length, custom_metadata) + body_length, options.metadata_version, custom_metadata) .Value(out); } diff --git a/cpp/src/arrow/ipc/metadata_internal.h b/cpp/src/arrow/ipc/metadata_internal.h index 5c1a032042b..8f432c43632 100644 --- a/cpp/src/arrow/ipc/metadata_internal.h +++ b/cpp/src/arrow/ipc/metadata_internal.h @@ -29,6 +29,7 @@ #include #include "arrow/buffer.h" +#include "arrow/io/type_fwd.h" #include "arrow/ipc/message.h" #include "arrow/result.h" #include "arrow/sparse_tensor.h" @@ -45,16 +46,6 @@ namespace arrow { namespace flatbuf = org::apache::arrow::flatbuf; -class DataType; -class KeyValueMetadata; -class Schema; - -namespace io { - -class OutputStream; - -} // namespace io - namespace ipc { class DictionaryMemo; @@ -68,13 +59,23 @@ using KVVector = flatbuffers::Vector; constexpr int32_t kIpcContinuationToken = -1; static constexpr flatbuf::MetadataVersion kCurrentMetadataVersion = - flatbuf::MetadataVersion::V4; + flatbuf::MetadataVersion::V5; + +static constexpr flatbuf::MetadataVersion kLatestMetadataVersion = + flatbuf::MetadataVersion::V5; static constexpr flatbuf::MetadataVersion kMinMetadataVersion = flatbuf::MetadataVersion::V4; MetadataVersion GetMetadataVersion(flatbuf::MetadataVersion version); +// This function is used in a unit test +ARROW_EXPORT +flatbuf::MetadataVersion MetadataVersionToFlatbuffer(MetadataVersion version); + +// Whether the type has a validity bitmap in the given IPC version +bool HasValidityBitmap(Type::type type_id, MetadataVersion version); + static constexpr const char* kArrowMagicBytes = "ARROW1"; struct FieldMetadata { @@ -172,7 +173,7 @@ static inline Status VerifyMessage(const uint8_t* data, int64_t size, // \param[out] out the serialized arrow::Buffer // \return Status outcome Status WriteSchemaMessage(const Schema& schema, DictionaryMemo* dictionary_memo, - std::shared_ptr* out); + const IpcWriteOptions& options, std::shared_ptr* out); // This function is used in a unit test ARROW_EXPORT @@ -183,11 +184,12 @@ Status WriteRecordBatchMessage( const IpcWriteOptions& options, std::shared_ptr* out); Result> WriteTensorMessage(const Tensor& tensor, - const int64_t buffer_start_offset); + const int64_t buffer_start_offset, + const IpcWriteOptions& options); Result> WriteSparseTensorMessage( const SparseTensor& sparse_tensor, int64_t body_length, - const std::vector& buffers); + const std::vector& buffers, const IpcWriteOptions& options); Status WriteFileFooter(const Schema& schema, const std::vector& dictionaries, const std::vector& record_batches, diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h index 385fc681449..69e248c6182 100644 --- a/cpp/src/arrow/ipc/options.h +++ b/cpp/src/arrow/ipc/options.h @@ -67,9 +67,11 @@ struct ARROW_EXPORT IpcWriteOptions { /// like compression bool use_threads = true; - /// \brief Format version to use for IPC messages and their - /// metadata. Presently using V4 version (readable by v0.8.0 and later). - MetadataVersion metadata_version = MetadataVersion::V4; + /// \brief Format version to use for IPC messages and their metadata. + /// + /// Presently using V5 version (readable by 1.0.0 and later). + /// V4 is also available (readable by 0.8.0 and later). + MetadataVersion metadata_version = MetadataVersion::V5; static IpcWriteOptions Defaults(); }; diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 40f8cc0a954..1abf0907f65 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -63,6 +63,24 @@ namespace test { using BatchVector = std::vector>; +const std::vector kMetadataVersions = {MetadataVersion::V4, + MetadataVersion::V5}; + +class TestMessage : public ::testing::TestWithParam { + public: + void SetUp() { + version_ = GetParam(); + fb_version_ = internal::MetadataVersionToFlatbuffer(version_); + options_ = IpcWriteOptions::Defaults(); + options_.metadata_version = version_; + } + + protected: + MetadataVersion version_; + flatbuf::MetadataVersion fb_version_; + IpcWriteOptions options_; +}; + TEST(TestMessage, Equals) { std::string metadata = "foo"; std::string body = "bar"; @@ -89,13 +107,12 @@ TEST(TestMessage, Equals) { ASSERT_FALSE(msg5.Equals(msg1)); } -TEST(TestMessage, SerializeTo) { +TEST_P(TestMessage, SerializeTo) { const int64_t body_length = 64; flatbuffers::FlatBufferBuilder fbb; - fbb.Finish(flatbuf::CreateMessage(fbb, internal::kCurrentMetadataVersion, - flatbuf::MessageHeader::RecordBatch, 0 /* header */, - body_length)); + fbb.Finish(flatbuf::CreateMessage(fbb, fb_version_, flatbuf::MessageHeader::RecordBatch, + 0 /* header */, body_length)); std::shared_ptr metadata; ASSERT_OK_AND_ASSIGN(metadata, internal::WriteFlatbufferBuilder(fbb)); @@ -106,12 +123,11 @@ TEST(TestMessage, SerializeTo) { Message::Open(metadata, std::make_shared(body))); auto CheckWithAlignment = [&](int32_t alignment) { - IpcWriteOptions options; - options.alignment = alignment; + options_.alignment = alignment; const int32_t prefix_size = 8; int64_t output_length = 0; ASSERT_OK_AND_ASSIGN(auto stream, io::BufferOutputStream::Create(1 << 10)); - ASSERT_OK(message->SerializeTo(stream.get(), options, &output_length)); + ASSERT_OK(message->SerializeTo(stream.get(), options_, &output_length)); ASSERT_EQ(BitUtil::RoundUp(metadata->size() + prefix_size, alignment) + body_length, output_length); ASSERT_OK_AND_EQ(output_length, stream->Tell()); @@ -121,7 +137,7 @@ TEST(TestMessage, SerializeTo) { CheckWithAlignment(64); } -TEST(TestMessage, SerializeCustomMetadata) { +TEST_P(TestMessage, SerializeCustomMetadata) { std::vector> cases = { nullptr, key_value_metadata({}, {}), key_value_metadata({"foo", "bar"}, {"fizz", "buzz"})}; @@ -130,7 +146,7 @@ TEST(TestMessage, SerializeCustomMetadata) { ASSERT_OK(internal::WriteRecordBatchMessage( /*length=*/0, /*body_length=*/0, metadata, /*nodes=*/{}, - /*buffers=*/{}, IpcWriteOptions::Defaults(), &serialized)); + /*buffers=*/{}, options_, &serialized)); ASSERT_OK_AND_ASSIGN(std::unique_ptr message, Message::Open(serialized, /*body=*/nullptr)); @@ -148,20 +164,19 @@ void BuffersOverlapEquals(const Buffer& left, const Buffer& right) { ASSERT_TRUE(left.Equals(right, std::min(left.size(), right.size()))); } -TEST(TestMessage, LegacyIpcBackwardsCompatibility) { +TEST_P(TestMessage, LegacyIpcBackwardsCompatibility) { std::shared_ptr batch; ASSERT_OK(MakeIntBatchSized(36, &batch)); - auto RoundtripWithOptions = [&](const IpcWriteOptions& arg_options, - std::shared_ptr* out_serialized, + auto RoundtripWithOptions = [&](std::shared_ptr* out_serialized, std::unique_ptr* out) { IpcPayload payload; - ASSERT_OK(GetRecordBatchPayload(*batch, arg_options, &payload)); + ASSERT_OK(GetRecordBatchPayload(*batch, options_, &payload)); ASSERT_OK_AND_ASSIGN(auto stream, io::BufferOutputStream::Create(1 << 20)); int32_t metadata_length = -1; - ASSERT_OK(WriteIpcPayload(payload, arg_options, stream.get(), &metadata_length)); + ASSERT_OK(WriteIpcPayload(payload, options_, stream.get(), &metadata_length)); ASSERT_OK_AND_ASSIGN(*out_serialized, stream->Finish()); io::BufferReader io_reader(*out_serialized); @@ -171,14 +186,13 @@ TEST(TestMessage, LegacyIpcBackwardsCompatibility) { std::shared_ptr serialized, legacy_serialized; std::unique_ptr message, legacy_message; - IpcWriteOptions options; - RoundtripWithOptions(options, &serialized, &message); + RoundtripWithOptions(&serialized, &message); // First 4 bytes 0xFFFFFFFF Continuation marker ASSERT_EQ(-1, util::SafeLoadAs(serialized->data())); - options.write_legacy_ipc_format = true; - RoundtripWithOptions(options, &legacy_serialized, &legacy_message); + options_.write_legacy_ipc_format = true; + RoundtripWithOptions(&legacy_serialized, &legacy_message); // Check that the continuation marker is not written ASSERT_NE(-1, util::SafeLoadAs(legacy_serialized->data())); @@ -196,11 +210,14 @@ TEST(TestMessage, Verify) { ASSERT_FALSE(message.Verify()); } +INSTANTIATE_TEST_SUITE_P(TestMessage, TestMessage, + ::testing::ValuesIn(kMetadataVersions)); + class TestSchemaMetadata : public ::testing::Test { public: void SetUp() {} - void CheckRoundtrip(const Schema& schema) { + void CheckSchemaRoundtrip(const Schema& schema) { DictionaryMemo in_memo, out_memo; ASSERT_OK_AND_ASSIGN(std::shared_ptr buffer, SerializeSchema(schema, &out_memo, default_memory_pool())); @@ -227,7 +244,7 @@ TEST_F(TestSchemaMetadata, PrimitiveFields) { auto f10 = field("f10", std::make_shared()); Schema schema({f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10}); - CheckRoundtrip(schema); + CheckSchemaRoundtrip(schema); } TEST_F(TestSchemaMetadata, NestedFields) { @@ -239,7 +256,7 @@ TEST_F(TestSchemaMetadata, NestedFields) { auto f1 = field("f1", type2); Schema schema({f0, f1}); - CheckRoundtrip(schema); + CheckSchemaRoundtrip(schema); } TEST_F(TestSchemaMetadata, DictionaryFields) { @@ -249,14 +266,14 @@ TEST_F(TestSchemaMetadata, DictionaryFields) { auto f1 = field("f1", list(dict_type)); Schema schema({f0, f1}); - CheckRoundtrip(schema); + CheckSchemaRoundtrip(schema); } { auto dict_type = dictionary(int8(), list(int32())); auto f0 = field("f0", dict_type); Schema schema({f0}); - CheckRoundtrip(schema); + CheckSchemaRoundtrip(schema); } } @@ -266,7 +283,7 @@ TEST_F(TestSchemaMetadata, NestedDictionaryFields) { auto dict_type = dictionary(int16(), list(inner_dict_type)); Schema schema({field("f0", dict_type)}); - CheckRoundtrip(schema); + CheckSchemaRoundtrip(schema); } { auto dict_type1 = dictionary(int8(), utf8(), /*ordered=*/true); @@ -279,7 +296,7 @@ TEST_F(TestSchemaMetadata, NestedDictionaryFields) { Schema schema({field("f1", dictionary(int32(), struct_type1)), field("f2", dictionary(int32(), struct_type2))}); - CheckRoundtrip(schema); + CheckSchemaRoundtrip(schema); } } @@ -291,7 +308,7 @@ TEST_F(TestSchemaMetadata, KeyValueMetadata) { auto f1 = field("f1", std::make_shared(), false, field_metadata); Schema schema({f0, f1}, schema_metadata); - CheckRoundtrip(schema); + CheckSchemaRoundtrip(schema); } #define BATCH_CASES() \ @@ -400,6 +417,7 @@ class IpcTestFixture : public io::MemoryMapFixture, public ExtensionTypesMixin { ASSERT_OK_AND_ASSIGN(result, DoLargeRoundTrip(batch, /*zero_data=*/true)); CheckReadResult(*result, batch); } + void CheckRoundtrip(const std::shared_ptr& array, IpcWriteOptions options = IpcWriteOptions::Defaults(), int64_t buffer_size = 1 << 20) { @@ -427,34 +445,46 @@ class TestIpcRoundTrip : public ::testing::TestWithParam, public: void SetUp() { IpcTestFixture::SetUp(); } void TearDown() { IpcTestFixture::TearDown(); } + + void TestMetadataVersion(MetadataVersion expected_version) { + std::shared_ptr batch; + ASSERT_OK(MakeIntRecordBatch(&batch)); + + mmap_.reset(); // Ditch previous mmap view, to avoid errors on Windows + ASSERT_OK_AND_ASSIGN(mmap_, + io::MemoryMapFixture::InitMemoryMap(1 << 16, "test-metadata")); + + int32_t metadata_length; + int64_t body_length; + const int64_t buffer_offset = 0; + ASSERT_OK(WriteRecordBatch(*batch, buffer_offset, mmap_.get(), &metadata_length, + &body_length, options_)); + + ASSERT_OK_AND_ASSIGN(std::unique_ptr message, + ReadMessage(0, metadata_length, mmap_.get())); + ASSERT_EQ(expected_version, message->metadata_version()); + } }; TEST_P(TestIpcRoundTrip, RoundTrip) { std::shared_ptr batch; ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue - CheckRoundtrip(*batch); + for (const auto version : kMetadataVersions) { + options_.metadata_version = version; + CheckRoundtrip(*batch); + } } -TEST_F(TestIpcRoundTrip, MetadataVersion) { - std::shared_ptr batch; - ASSERT_OK(MakeIntRecordBatch(&batch)); - - ASSERT_OK_AND_ASSIGN(mmap_, - io::MemoryMapFixture::InitMemoryMap(1 << 16, "test-metadata")); - - int32_t metadata_length; - int64_t body_length; - - const int64_t buffer_offset = 0; - - ASSERT_OK(WriteRecordBatch(*batch, buffer_offset, mmap_.get(), &metadata_length, - &body_length, options_)); - - ASSERT_OK_AND_ASSIGN(std::unique_ptr message, - ReadMessage(0, metadata_length, mmap_.get())); +TEST_F(TestIpcRoundTrip, DefaultMetadataVersion) { + TestMetadataVersion(MetadataVersion::V5); +} - ASSERT_EQ(MetadataVersion::V4, message->metadata_version()); +TEST_F(TestIpcRoundTrip, SpecificMetadataVersion) { + options_.metadata_version = MetadataVersion::V4; + TestMetadataVersion(MetadataVersion::V4); + options_.metadata_version = MetadataVersion::V5; + TestMetadataVersion(MetadataVersion::V5); } TEST(TestReadMessage, CorruptedSmallInput) { diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 2df10f6949b..3c51fef033c 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -43,6 +43,7 @@ #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit_util.h" +#include "arrow/util/bitmap_ops.h" #include "arrow/util/checked_cast.h" #include "arrow/util/compression.h" #include "arrow/util/key_value_metadata.h" @@ -109,9 +110,11 @@ Status InvalidMessageType(MessageType expected, MessageType actual) { class ArrayLoader { public: explicit ArrayLoader(const flatbuf::RecordBatch* metadata, + MetadataVersion metadata_version, const DictionaryMemo* dictionary_memo, const IpcReadOptions& options, io::RandomAccessFile* file) : metadata_(metadata), + metadata_version_(metadata_version), file_(file), dictionary_memo_(dictionary_memo), max_recursion_depth_(options.max_recursion_depth) {} @@ -186,11 +189,10 @@ class ArrayLoader { // we can skip that buffer without reading from shared memory RETURN_NOT_OK(GetFieldMetadata(field_index_++, out_)); - if (::arrow::internal::HasValidityBitmap(type_id)) { - // Extract null_bitmap which is common to all arrays except for unions. - if (out_->null_count == 0) { - out_->buffers[0] = nullptr; - } else { + if (internal::HasValidityBitmap(type_id, metadata_version_)) { + // Extract null_bitmap which is common to all arrays except for unions + // and nulls. + if (out_->null_count != 0) { RETURN_NOT_OK(GetBuffer(buffer_index_, &out_->buffers[0])); } buffer_index_++; @@ -312,9 +314,22 @@ class ArrayLoader { RETURN_NOT_OK(LoadCommon(type.id())); - // Validity bitmap placeholder like for NullType, which is never sent or - // received in IPC. + // With metadata V4, we can get a validity bitmap. + // Trying to fix up union data to do without the top-level validity bitmap + // is hairy: + // - type ids must be rewritten to all have valid values (even for former + // null slots) + // - sparse union children must have their validity bitmaps rewritten + // by ANDing the top-level validity bitmap + // - dense union children must be rewritten (at least one of them) + // to insert the required null slots that were formerly omitted + // So instead we bail out. + if (out_->null_count != 0 && out_->buffers[0] != nullptr) { + return Status::Invalid( + "Cannot read pre-1.0.0 Union array with top-level validity bitmap"); + } out_->buffers[0] = nullptr; + out_->null_count = 0; if (out_->length > 0) { RETURN_NOT_OK(GetBuffer(buffer_index_, &out_->buffers[1])); @@ -343,6 +358,7 @@ class ArrayLoader { private: const flatbuf::RecordBatch* metadata_; + const MetadataVersion metadata_version_; io::RandomAccessFile* file_; const DictionaryMemo* dictionary_memo_; int max_recursion_depth_; @@ -426,9 +442,9 @@ Status DecompressBuffers(Compression::type compression, const IpcReadOptions& op Result> LoadRecordBatchSubset( const flatbuf::RecordBatch* metadata, const std::shared_ptr& schema, const std::vector& inclusion_mask, const DictionaryMemo* dictionary_memo, - const IpcReadOptions& options, Compression::type compression, - io::RandomAccessFile* file) { - ArrayLoader loader(metadata, dictionary_memo, options, file); + const IpcReadOptions& options, MetadataVersion metadata_version, + Compression::type compression, io::RandomAccessFile* file) { + ArrayLoader loader(metadata, metadata_version, dictionary_memo, options, file); std::vector> field_data; std::vector> schema_fields; @@ -461,14 +477,14 @@ Result> LoadRecordBatchSubset( Result> LoadRecordBatch( const flatbuf::RecordBatch* metadata, const std::shared_ptr& schema, const std::vector& inclusion_mask, const DictionaryMemo* dictionary_memo, - const IpcReadOptions& options, Compression::type compression, - io::RandomAccessFile* file) { + const IpcReadOptions& options, MetadataVersion metadata_version, + Compression::type compression, io::RandomAccessFile* file) { if (inclusion_mask.size() > 0) { return LoadRecordBatchSubset(metadata, schema, inclusion_mask, dictionary_memo, - options, compression, file); + options, metadata_version, compression, file); } - ArrayLoader loader(metadata, dictionary_memo, options, file); + ArrayLoader loader(metadata, metadata_version, dictionary_memo, options, file); std::vector> arrays(schema->num_fields()); for (int i = 0; i < schema->num_fields(); ++i) { auto arr = std::make_shared(); @@ -576,7 +592,8 @@ Result> ReadRecordBatchInternal( RETURN_NOT_OK(GetCompressionExperimental(message, &compression)); } return LoadRecordBatch(batch, schema, inclusion_mask, dictionary_memo, options, - compression, file); + internal::GetMetadataVersion(message->version()), compression, + file); } // If we are selecting only certain fields, populate an inclusion mask for fast lookups. @@ -688,6 +705,7 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo, ARROW_ASSIGN_OR_RAISE( batch, LoadRecordBatch(batch_meta, ::arrow::schema({value_field}), /*field_inclusion_mask=*/{}, dictionary_memo, options, + internal::GetMetadataVersion(message->version()), compression, file)); if (batch->num_columns() != 1) { return Status::Invalid("Dictionary record batch must only contain one field"); diff --git a/cpp/src/arrow/ipc/stream_to_file.cc b/cpp/src/arrow/ipc/stream_to_file.cc index c2cec48babe..22518d3e46c 100644 --- a/cpp/src/arrow/ipc/stream_to_file.cc +++ b/cpp/src/arrow/ipc/stream_to_file.cc @@ -37,7 +37,8 @@ Status ConvertToFile() { io::StdoutStream sink; ARROW_ASSIGN_OR_RAISE(auto reader, RecordBatchStreamReader::Open(&input)); - ARROW_ASSIGN_OR_RAISE(auto writer, NewFileWriter(&sink, reader->schema())); + ARROW_ASSIGN_OR_RAISE( + auto writer, NewFileWriter(&sink, reader->schema(), IpcWriteOptions::Defaults())); std::shared_ptr batch; while (true) { ARROW_ASSIGN_OR_RAISE(batch, reader->Next()); diff --git a/cpp/src/arrow/ipc/type_fwd.h b/cpp/src/arrow/ipc/type_fwd.h index 4851984df02..bef9776c6a0 100644 --- a/cpp/src/arrow/ipc/type_fwd.h +++ b/cpp/src/arrow/ipc/type_fwd.h @@ -30,8 +30,11 @@ enum class MetadataVersion : char { /// 0.3.0 to 0.7.1 V3, - /// >= 0.8.0 - V4 + /// 0.8.0 to 0.17.0 + V4, + + /// >= 1.0.0 + V5 }; class Message; diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 0887151c474..292fe9cb2fc 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -138,8 +138,9 @@ class RecordBatchSerializer { // push back all common elements field_nodes_.push_back({arr.length(), arr.null_count(), 0}); - // Null and union types have no validity bitmap - if (::arrow::internal::HasValidityBitmap(arr.type_id())) { + // In V4, null types have no validity bitmap + // In V5 and later, null and union types have no validity bitmap + if (internal::HasValidityBitmap(arr.type_id(), options_.metadata_version)) { if (arr.null_count() > 0) { std::shared_ptr bitmap; RETURN_NOT_OK(GetTruncatedBitmap(arr.offset(), arr.length(), arr.null_bitmap(), @@ -597,7 +598,7 @@ Status WriteIpcPayload(const IpcPayload& payload, const IpcWriteOptions& options Status GetSchemaPayload(const Schema& schema, const IpcWriteOptions& options, DictionaryMemo* dictionary_memo, IpcPayload* out) { out->type = MessageType::SCHEMA; - return internal::WriteSchemaMessage(schema, dictionary_memo, &out->metadata); + return internal::WriteSchemaMessage(schema, dictionary_memo, options, &out->metadata); } Status GetDictionaryPayload(int64_t id, const std::shared_ptr& dictionary, @@ -654,10 +655,10 @@ namespace { Status WriteTensorHeader(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length) { - std::shared_ptr metadata; - ARROW_ASSIGN_OR_RAISE(metadata, internal::WriteTensorMessage(tensor, 0)); IpcWriteOptions options; options.alignment = kTensorAlignment; + std::shared_ptr metadata; + ARROW_ASSIGN_OR_RAISE(metadata, internal::WriteTensorMessage(tensor, 0, options)); return WriteMessage(*metadata, options, dst, metadata_length); } @@ -745,8 +746,11 @@ Result> GetTensorMessage(const Tensor& tensor, tensor_to_write = temp_tensor.get(); } + IpcWriteOptions options; + options.alignment = kTensorAlignment; std::shared_ptr metadata; - ARROW_ASSIGN_OR_RAISE(metadata, internal::WriteTensorMessage(*tensor_to_write, 0)); + ARROW_ASSIGN_OR_RAISE(metadata, + internal::WriteTensorMessage(*tensor_to_write, 0, options)); return std::unique_ptr(new Message(metadata, tensor_to_write->data())); } @@ -755,7 +759,9 @@ namespace internal { class SparseTensorSerializer { public: SparseTensorSerializer(int64_t buffer_start_offset, IpcPayload* out) - : out_(out), buffer_start_offset_(buffer_start_offset) {} + : out_(out), + buffer_start_offset_(buffer_start_offset), + options_(IpcWriteOptions::Defaults()) {} ~SparseTensorSerializer() = default; @@ -791,7 +797,8 @@ class SparseTensorSerializer { } Status SerializeMetadata(const SparseTensor& sparse_tensor) { - return WriteSparseTensorMessage(sparse_tensor, out_->body_length, buffer_meta_) + return WriteSparseTensorMessage(sparse_tensor, out_->body_length, buffer_meta_, + options_) .Value(&out_->metadata); } @@ -852,8 +859,8 @@ class SparseTensorSerializer { IpcPayload* out_; std::vector buffer_meta_; - int64_t buffer_start_offset_; + IpcWriteOptions options_; }; } // namespace internal diff --git a/cpp/src/arrow/testing/json_integration_test.cc b/cpp/src/arrow/testing/json_integration_test.cc index 16067c5a0c6..e2f01da2de6 100644 --- a/cpp/src/arrow/testing/json_integration_test.cc +++ b/cpp/src/arrow/testing/json_integration_test.cc @@ -57,13 +57,13 @@ DEFINE_bool(verbose, true, "Verbose output"); namespace arrow { -class Buffer; using internal::TemporaryDir; +using ipc::DictionaryMemo; +using ipc::IpcWriteOptions; +using ipc::MetadataVersion; namespace testing { -using ::arrow::ipc::DictionaryMemo; - using namespace ::arrow::ipc::test; // NOLINT // Convert JSON file to IPC binary format @@ -83,8 +83,8 @@ static Status ConvertJsonToArrow(const std::string& json_path, << reader->schema()->ToString(/* show_metadata = */ true) << std::endl; } - ARROW_ASSIGN_OR_RAISE(auto writer, - ipc::NewFileWriter(out_file.get(), reader->schema())); + ARROW_ASSIGN_OR_RAISE(auto writer, ipc::NewFileWriter(out_file.get(), reader->schema(), + IpcWriteOptions::Defaults())); for (int i = 0; i < reader->num_record_batches(); ++i) { std::shared_ptr batch; RETURN_NOT_OK(reader->ReadRecordBatch(i, &batch)); diff --git a/cpp/src/arrow/testing/json_internal.cc b/cpp/src/arrow/testing/json_internal.cc index 1ddaf3a4541..31a64f52e91 100644 --- a/cpp/src/arrow/testing/json_internal.cc +++ b/cpp/src/arrow/testing/json_internal.cc @@ -940,6 +940,7 @@ static Status GetUnion(const RjObject& json_type, std::vector type_codes; const auto& id_array = it_type_codes->value.GetArray(); + type_codes.reserve(id_array.Size()); for (const rj::Value& val : id_array) { DCHECK(val.IsInt()); type_codes.push_back(static_cast(val.GetInt())); @@ -1591,6 +1592,7 @@ class ArrayReader { RETURN_NOT_ARRAY("VALIDITY", json_valid_iter, obj_); const auto& json_validity = json_valid_iter->value.GetArray(); DCHECK_EQ(static_cast(json_validity.Size()), length_); + is_valid_.reserve(json_validity.Size()); for (const rj::Value& val : json_validity) { DCHECK(val.IsInt()); is_valid_.push_back(val.GetInt() != 0); diff --git a/cpp/src/generated/Schema_generated.h b/cpp/src/generated/Schema_generated.h index ca8a6239e72..91e01d33758 100644 --- a/cpp/src/generated/Schema_generated.h +++ b/cpp/src/generated/Schema_generated.h @@ -89,45 +89,111 @@ struct Schema; struct SchemaBuilder; enum class MetadataVersion : int16_t { - /// 0.1.0 + /// 0.1.0 (October 2016). V1 = 0, - /// 0.2.0 + /// 0.2.0 (February 2017). Non-backwards compatible with V1. V2 = 1, - /// 0.3.0 -> 0.7.1 + /// 0.3.0 -> 0.7.1 (May - December 2017). Non-backwards compatible with V2. V3 = 2, - /// >= 0.8.0 + /// >= 0.8.0 (December 2017). Non-backwards compatible with V3. V4 = 3, + /// >= 1.0.0 (July 2020. Backwards compatible with V4 (V5 readers can read V4 + /// metadata and IPC messages). Implementations are recommended to provide a + /// V4 compatibility mode with V5 format changes disabled. + /// + /// Incompatible changes between V4 and V5: + /// - Union buffer layout has changed. In V5, Unions don't have a validity + /// bitmap buffer. + V5 = 4, MIN = V1, - MAX = V4 + MAX = V5 }; -inline const MetadataVersion (&EnumValuesMetadataVersion())[4] { +inline const MetadataVersion (&EnumValuesMetadataVersion())[5] { static const MetadataVersion values[] = { MetadataVersion::V1, MetadataVersion::V2, MetadataVersion::V3, - MetadataVersion::V4 + MetadataVersion::V4, + MetadataVersion::V5 }; return values; } inline const char * const *EnumNamesMetadataVersion() { - static const char * const names[5] = { + static const char * const names[6] = { "V1", "V2", "V3", "V4", + "V5", nullptr }; return names; } inline const char *EnumNameMetadataVersion(MetadataVersion e) { - if (flatbuffers::IsOutRange(e, MetadataVersion::V1, MetadataVersion::V4)) return ""; + if (flatbuffers::IsOutRange(e, MetadataVersion::V1, MetadataVersion::V5)) return ""; const size_t index = static_cast(e); return EnumNamesMetadataVersion()[index]; } +/// Represents Arrow Features that might not have full support +/// within implementations. This is intended to be used in +/// two scenarios: +/// 1. A mechanism for readers of Arrow Streams +/// and files to understand that the stream or file makes +/// use of a feature that isn't supported or unknown to +/// the implementation (and therefore can meet the Arrow +/// forward compatibility guarantees). +/// 2. A means of negotiating between a client and server +/// what features a stream is allowed to use. The enums +/// values here are intented to represent higher level +/// features, additional details maybe negotiated +/// with key-value pairs specific to the protocol. +/// +/// Enums added to this list should be assigned power-of-two values +/// to facilitate exchanging and comparing bitmaps for supported +/// features. +enum class Feature : int64_t { + /// Needed to make flatbuffers happy. + UNUSED = 0, + /// The stream makes use of multiple full dictionaries with the + /// same ID and assumes clients implement dictionary replacement + /// correctly. + DICTIONARY_REPLACEMENT = 1LL, + /// The stream makes use of compressed bodies as described + /// in Message.fbs. + COMPRESSED_BODY = 2LL, + MIN = UNUSED, + MAX = COMPRESSED_BODY +}; + +inline const Feature (&EnumValuesFeature())[3] { + static const Feature values[] = { + Feature::UNUSED, + Feature::DICTIONARY_REPLACEMENT, + Feature::COMPRESSED_BODY + }; + return values; +} + +inline const char * const *EnumNamesFeature() { + static const char * const names[4] = { + "UNUSED", + "DICTIONARY_REPLACEMENT", + "COMPRESSED_BODY", + nullptr + }; + return names; +} + +inline const char *EnumNameFeature(Feature e) { + if (flatbuffers::IsOutRange(e, Feature::UNUSED, Feature::COMPRESSED_BODY)) return ""; + const size_t index = static_cast(e); + return EnumNamesFeature()[index]; +} + enum class UnionMode : int16_t { Sparse = 0, Dense = 1, @@ -1595,8 +1661,11 @@ struct DictionaryEncoding FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { int64_t id() const { return GetField(VT_ID, 0); } - /// The dictionary indices are constrained to be positive integers. If this - /// field is null, the indices must be signed int32 + /// The dictionary indices are constrained to be non-negative integers. If + /// this field is null, the indices must be signed int32. To maximize + /// cross-language compatibility and performance, implementations are + /// recommended to prefer signed integer types over unsigned integer types + /// and to avoid uint64 indices unless they are required by an application. const org::apache::arrow::flatbuf::Int *indexType() const { return GetPointer(VT_INDEXTYPE); } @@ -1960,7 +2029,8 @@ struct Schema FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { VT_ENDIANNESS = 4, VT_FIELDS = 6, - VT_CUSTOM_METADATA = 8 + VT_CUSTOM_METADATA = 8, + VT_FEATURES = 10 }; /// endianness of the buffer /// it is Little Endian by default @@ -1974,6 +2044,10 @@ struct Schema FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { const flatbuffers::Vector> *custom_metadata() const { return GetPointer> *>(VT_CUSTOM_METADATA); } + /// Features used in the stream/file. + const flatbuffers::Vector *features() const { + return GetPointer *>(VT_FEATURES); + } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && VerifyField(verifier, VT_ENDIANNESS) && @@ -1983,6 +2057,8 @@ struct Schema FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { VerifyOffset(verifier, VT_CUSTOM_METADATA) && verifier.VerifyVector(custom_metadata()) && verifier.VerifyVectorOfTables(custom_metadata()) && + VerifyOffset(verifier, VT_FEATURES) && + verifier.VerifyVector(features()) && verifier.EndTable(); } }; @@ -2000,6 +2076,9 @@ struct SchemaBuilder { void add_custom_metadata(flatbuffers::Offset>> custom_metadata) { fbb_.AddOffset(Schema::VT_CUSTOM_METADATA, custom_metadata); } + void add_features(flatbuffers::Offset> features) { + fbb_.AddOffset(Schema::VT_FEATURES, features); + } explicit SchemaBuilder(flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { start_ = fbb_.StartTable(); @@ -2016,8 +2095,10 @@ inline flatbuffers::Offset CreateSchema( flatbuffers::FlatBufferBuilder &_fbb, org::apache::arrow::flatbuf::Endianness endianness = org::apache::arrow::flatbuf::Endianness::Little, flatbuffers::Offset>> fields = 0, - flatbuffers::Offset>> custom_metadata = 0) { + flatbuffers::Offset>> custom_metadata = 0, + flatbuffers::Offset> features = 0) { SchemaBuilder builder_(_fbb); + builder_.add_features(features); builder_.add_custom_metadata(custom_metadata); builder_.add_fields(fields); builder_.add_endianness(endianness); @@ -2028,14 +2109,17 @@ inline flatbuffers::Offset CreateSchemaDirect( flatbuffers::FlatBufferBuilder &_fbb, org::apache::arrow::flatbuf::Endianness endianness = org::apache::arrow::flatbuf::Endianness::Little, const std::vector> *fields = nullptr, - const std::vector> *custom_metadata = nullptr) { + const std::vector> *custom_metadata = nullptr, + const std::vector *features = nullptr) { auto fields__ = fields ? _fbb.CreateVector>(*fields) : 0; auto custom_metadata__ = custom_metadata ? _fbb.CreateVector>(*custom_metadata) : 0; + auto features__ = features ? _fbb.CreateVector(*features) : 0; return org::apache::arrow::flatbuf::CreateSchema( _fbb, endianness, fields__, - custom_metadata__); + custom_metadata__, + features__); } inline bool VerifyType(flatbuffers::Verifier &verifier, const void *obj, Type type) { diff --git a/format/Schema.fbs b/format/Schema.fbs index 09b07307b83..fd886f698f5 100644 --- a/format/Schema.fbs +++ b/format/Schema.fbs @@ -36,7 +36,9 @@ enum MetadataVersion:short { /// metadata and IPC messages). Implementations are recommended to provide a /// V4 compatibility mode with V5 format changes disabled. /// - /// TODO: Add list of non-forward compatible changes. + /// Incompatible changes between V4 and V5: + /// - Union buffer layout has changed. In V5, Unions don't have a validity + /// bitmap buffer. V5, } diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index cefac7ecb51..dd6fa2f2d37 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -200,10 +200,10 @@ def _plasma_store_entry_point(): "plasma-store-server") _os.execv(plasma_store_executable, _sys.argv) + # ---------------------------------------------------------------------- # Deprecations - from pyarrow.util import _deprecate_api, _deprecate_class read_message = _deprecate_api("read_message", "ipc.read_message", @@ -279,7 +279,7 @@ def _deprecate_scalar(ty, symbol): # TODO: Deprecate these somehow in the pyarrow namespace -from pyarrow.ipc import (Message, MessageReader, +from pyarrow.ipc import (Message, MessageReader, MetadataVersion, RecordBatchFileReader, RecordBatchFileWriter, RecordBatchStreamReader, RecordBatchStreamWriter) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 6daa283a0b2..96a2182dc3b 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1287,11 +1287,15 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: MessageType_DICTIONARY_BATCH\ " arrow::ipc::MessageType::DICTIONARY_BATCH" - enum MetadataVersion" arrow::ipc::MetadataVersion": - MessageType_V1" arrow::ipc::MetadataVersion::V1" - MessageType_V2" arrow::ipc::MetadataVersion::V2" - MessageType_V3" arrow::ipc::MetadataVersion::V3" - MessageType_V4" arrow::ipc::MetadataVersion::V4" + # TODO: use "cpdef enum class" to automatically get a Python wrapper? + # See + # https://github.com/cython/cython/commit/2c7c22f51405299a4e247f78edf52957d30cf71d#diff-61c1365c0f761a8137754bb3a73bfbf7 + ctypedef enum CMetadataVersion" arrow::ipc::MetadataVersion": + CMetadataVersion_V1" arrow::ipc::MetadataVersion::V1" + CMetadataVersion_V2" arrow::ipc::MetadataVersion::V2" + CMetadataVersion_V3" arrow::ipc::MetadataVersion::V3" + CMetadataVersion_V4" arrow::ipc::MetadataVersion::V4" + CMetadataVersion_V5" arrow::ipc::MetadataVersion::V5" cdef cppclass CIpcWriteOptions" arrow::ipc::IpcWriteOptions": c_bool allow_64bit @@ -1329,7 +1333,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: c_bool Equals(const CMessage& other) shared_ptr[CBuffer] metadata() - MetadataVersion metadata_version() + CMetadataVersion metadata_version() MessageType type() CStatus SerializeTo(COutputStream* stream, diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index a91eabd9875..6d67539a6c9 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -18,6 +18,32 @@ import warnings +cpdef enum MetadataVersion: + V1 = CMetadataVersion_V1 + V2 = CMetadataVersion_V2 + V3 = CMetadataVersion_V3 + V4 = CMetadataVersion_V4 + V5 = CMetadataVersion_V5 + + +cdef object _wrap_metadata_version(CMetadataVersion version): + return MetadataVersion( version) + + +cdef CMetadataVersion _unwrap_metadata_version( + MetadataVersion version) except *: + if version == MetadataVersion.V1: + return CMetadataVersion_V1 + elif version == MetadataVersion.V2: + return CMetadataVersion_V2 + elif version == MetadataVersion.V3: + return CMetadataVersion_V3 + elif version == MetadataVersion.V4: + return CMetadataVersion_V4 + elif version == MetadataVersion.V5: + return CMetadataVersion_V5 + + cdef class Message: """ Container for an Arrow IPC message with metadata and optional body @@ -39,6 +65,10 @@ cdef class Message: def metadata(self): return pyarrow_wrap_buffer(self.message.get().metadata()) + @property + def metadata_version(self): + return _wrap_metadata_version(self.message.get().metadata_version()) + @property def body(self): cdef shared_ptr[CBuffer] body = self.message.get().body() diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index fbbf98aa0cf..039d625eed4 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -20,7 +20,7 @@ import pyarrow as pa -from pyarrow.lib import (Message, MessageReader, # noqa +from pyarrow.lib import (Message, MessageReader, MetadataVersion, # noqa read_message, read_record_batch, read_schema, read_tensor, write_tensor, get_record_batch_size, get_tensor_size) diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index cda38d6c4e6..f299f27fd26 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -374,11 +374,13 @@ def test_message_reader(example_messages): assert messages[0].type == 'schema' assert isinstance(messages[0].metadata, pa.Buffer) assert isinstance(messages[0].body, pa.Buffer) + assert messages[0].metadata_version == pa.MetadataVersion.V5 for msg in messages[1:]: assert msg.type == 'record batch' assert isinstance(msg.metadata, pa.Buffer) assert isinstance(msg.body, pa.Buffer) + assert msg.metadata_version == pa.MetadataVersion.V5 def test_message_serialize_read_message(example_messages): diff --git a/testing b/testing index 90b987c4fd6..bd81ce53fe6 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 90b987c4fd696ca7fabcf53fdbb49c62cd39c47c +Subproject commit bd81ce53fe68daad0b591df15a38c6ce2458ceb4