From 5fd327f1d4afc732b3c5d5b94c5a322741a03954 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 9 May 2023 09:59:43 +0800 Subject: [PATCH 1/3] GH-35304: [C++][ORC] Support attributes conversion --- cpp/src/arrow/adapters/orc/adapter.cc | 6 +- cpp/src/arrow/adapters/orc/adapter_test.cc | 110 +++++++++++++++++++++ cpp/src/arrow/adapters/orc/util.cc | 77 +++++++++++---- cpp/src/arrow/adapters/orc/util.h | 8 ++ 4 files changed, 177 insertions(+), 24 deletions(-) diff --git a/cpp/src/arrow/adapters/orc/adapter.cc b/cpp/src/arrow/adapters/orc/adapter.cc index ac2a3db645c..2466e7433ab 100644 --- a/cpp/src/arrow/adapters/orc/adapter.cc +++ b/cpp/src/arrow/adapters/orc/adapter.cc @@ -321,9 +321,9 @@ class ORCFileReader::Impl { std::vector> fields; fields.reserve(size); for (int child = 0; child < size; ++child) { - ARROW_ASSIGN_OR_RAISE(auto elemtype, GetArrowType(type.getSubtype(child))); - std::string name = type.getFieldName(child); - fields.push_back(field(std::move(name), std::move(elemtype))); + const std::string& name = type.getFieldName(child); + ARROW_ASSIGN_OR_RAISE(auto elem_field, GetArrowField(name, type.getSubtype(child))); + fields.push_back(std::move(elem_field)); } ARROW_ASSIGN_OR_RAISE(auto metadata, ReadMetadata()); return std::make_shared(std::move(fields), std::move(metadata)); diff --git a/cpp/src/arrow/adapters/orc/adapter_test.cc b/cpp/src/arrow/adapters/orc/adapter_test.cc index 890e9e89726..ba94d8d93f4 100644 --- a/cpp/src/arrow/adapters/orc/adapter_test.cc +++ b/cpp/src/arrow/adapters/orc/adapter_test.cc @@ -506,6 +506,116 @@ TEST(TestAdapterRead, ReadCharAndVarcharType) { ASSERT_EQ(nullptr, record_batch); } +TEST(TestAdapterRead, ReadFieldAttributes) { + const std::string id_key = "iceberg.id"; + const std::string required_key = "iceberg.required"; + + auto set_attributes = [&](liborc::Type* type, const std::string& id, + const std::string& required) { + type->setAttribute(id_key, id); + type->setAttribute(required_key, required); + }; + + auto check_attributes = [&](const std::shared_ptr& field, + const std::string& expect_id, + const std::string& expect_required) { + auto field_metadata = field->metadata(); + ASSERT_NE(field_metadata, nullptr); + ASSERT_EQ(expect_id, field_metadata->Get(id_key)); + ASSERT_EQ(expect_required, field_metadata->Get(required_key)); + }; + + auto c1_type = liborc::createPrimitiveType(liborc::TypeKind::INT); + set_attributes(c1_type.get(), "1", "true"); + + auto c2_elem_type = liborc::createPrimitiveType(liborc::TypeKind::INT); + set_attributes(c2_elem_type.get(), "3", "false"); + auto c2_type = liborc::createListType(std::move(c2_elem_type)); + set_attributes(c2_type.get(), "2", "false"); + + auto c3_key_type = liborc::createPrimitiveType(liborc::TypeKind::INT); + set_attributes(c3_key_type.get(), "5", "true"); + auto c3_value_type = liborc::createPrimitiveType(liborc::TypeKind::INT); + set_attributes(c3_value_type.get(), "6", "false"); + auto c3_type = liborc::createMapType(std::move(c3_key_type), std::move(c3_value_type)); + set_attributes(c3_type.get(), "4", "false"); + + auto c4_sub_type = liborc::createPrimitiveType(liborc::TypeKind::INT); + set_attributes(c4_sub_type.get(), "8", "false"); + auto c4_type = liborc::createStructType(); + c4_type->addStructField("c4_1", std::move(c4_sub_type)); + set_attributes(c4_type.get(), "7", "false"); + + auto orc_type = liborc::createStructType(); + orc_type->addStructField("c1", std::move(c1_type)); + orc_type->addStructField("c2", std::move(c2_type)); + orc_type->addStructField("c3", std::move(c3_type)); + orc_type->addStructField("c4", std::move(c4_type)); + + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + auto writer = CreateWriter(/*stripe_size=*/1024, *orc_type, &mem_stream); + writer->close(); + + std::shared_ptr in_stream(std::make_shared( + reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength()))); + ASSERT_OK_AND_ASSIGN( + auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + ASSERT_EQ(0, reader->NumberOfRows()); + + ASSERT_OK_AND_ASSIGN(auto schema, reader->ReadSchema()); + ASSERT_EQ(4, schema->num_fields()); + + // check top level fields + check_attributes(schema->field(0), "1", "true"); + check_attributes(schema->field(1), "2", "false"); + check_attributes(schema->field(2), "4", "false"); + check_attributes(schema->field(3), "7", "false"); + + // check list element type + auto list_type = checked_pointer_cast(schema->field(1)->type()); + check_attributes(list_type->value_field(), "3", "false"); + + // check map key/value types + auto map_type = checked_pointer_cast(schema->field(2)->type()); + check_attributes(map_type->key_field(), "5", "true"); + check_attributes(map_type->item_field(), "6", "false"); + + // check struct sub-field type + auto struct_type = checked_pointer_cast(schema->field(3)->type()); + check_attributes(struct_type->field(0), "8", "false"); +} + +TEST(TestAdapterReadWrite, FieldAttributesRoundTrip) { + EXPECT_OK_AND_ASSIGN(auto buffer_output_stream, io::BufferOutputStream::Create(1024)); + auto write_options = adapters::orc::WriteOptions(); + write_options.compression = Compression::UNCOMPRESSED; + EXPECT_OK_AND_ASSIGN(auto writer, adapters::orc::ORCFileWriter::Open( + buffer_output_stream.get(), write_options)); + + auto schema = ::arrow::schema( + {::arrow::field("c0", ::arrow::int64(), true, key_value_metadata({"k0"}, {"v0"})), + ::arrow::field("c1", ::arrow::utf8(), true, key_value_metadata({"k1"}, {"v1"})), + ::arrow::field( + "c2", ::arrow::list(::arrow::field("item", ::arrow::int64(), true, + key_value_metadata({"k0"}, {"ddv0"}))))}); + auto expected_output_table = ::arrow::TableFromJSON(schema, {R"([[1, "a", [1, 2]]])"}); + ARROW_EXPECT_OK(writer->Write(*expected_output_table)); + ARROW_EXPECT_OK(writer->Close()); + + EXPECT_OK_AND_ASSIGN(auto buffer, buffer_output_stream->Finish()); + std::shared_ptr in_stream(new io::BufferReader(buffer)); + EXPECT_OK_AND_ASSIGN( + auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + EXPECT_OK_AND_ASSIGN(auto actual_output_table, reader->Read()); + ASSERT_OK(actual_output_table->ValidateFull()); + AssertTablesEqual(*expected_output_table, *actual_output_table, false, false); + + // Check schema equality with metadata. + EXPECT_OK_AND_ASSIGN(auto read_schema, reader->ReadSchema()); + AssertSchemaEqual(schema, read_schema, /*check_metadata=*/true); +} + // Trivial class TestORCWriterTrivialNoWrite : public ::testing::Test {}; diff --git a/cpp/src/arrow/adapters/orc/util.cc b/cpp/src/arrow/adapters/orc/util.cc index d09ddb0d198..d45f0c53491 100644 --- a/cpp/src/arrow/adapters/orc/util.cc +++ b/cpp/src/arrow/adapters/orc/util.cc @@ -951,6 +951,15 @@ Status WriteBatch(const Array& array, int64_t orc_offset, } } +void SetAttributes(const std::shared_ptr& field, liborc::Type* type) { + if (field->HasMetadata()) { + const auto& metadata = field->metadata(); + for (int j = 0; j < metadata->size(); j++) { + type->setAttribute(metadata->key(j), metadata->value(j)); + } + } +} + Result> GetOrcType(const DataType& type) { Type::type kind = type.id(); switch (kind) { @@ -1000,9 +1009,9 @@ Result> GetOrcType(const DataType& type) { case Type::type::LIST: case Type::type::FIXED_SIZE_LIST: case Type::type::LARGE_LIST: { - std::shared_ptr arrow_child_type = - checked_cast(type).value_type(); - ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*arrow_child_type)); + const auto& value_field = checked_cast(type).value_field(); + ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*value_field->type())); + SetAttributes(value_field, orc_subtype.get()); return liborc::createListType(std::move(orc_subtype)); } case Type::type::STRUCT: { @@ -1011,19 +1020,19 @@ Result> GetOrcType(const DataType& type) { checked_cast(type).fields(); for (auto it = arrow_fields.begin(); it != arrow_fields.end(); ++it) { std::string field_name = (*it)->name(); - std::shared_ptr arrow_child_type = (*it)->type(); - ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*arrow_child_type)); + ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*(*it)->type())); + SetAttributes(*it, orc_subtype.get()); out_type->addStructField(field_name, std::move(orc_subtype)); } return std::move(out_type); } case Type::type::MAP: { - std::shared_ptr key_arrow_type = - checked_cast(type).key_type(); - std::shared_ptr item_arrow_type = - checked_cast(type).item_type(); - ARROW_ASSIGN_OR_RAISE(auto key_orc_type, GetOrcType(*key_arrow_type)); - ARROW_ASSIGN_OR_RAISE(auto item_orc_type, GetOrcType(*item_arrow_type)); + const auto& key_field = checked_cast(type).key_field(); + const auto& item_field = checked_cast(type).item_field(); + ARROW_ASSIGN_OR_RAISE(auto key_orc_type, GetOrcType(*key_field->type())); + ARROW_ASSIGN_OR_RAISE(auto item_orc_type, GetOrcType(*item_field->type())); + SetAttributes(key_field, key_orc_type.get()); + SetAttributes(item_field, item_orc_type.get()); return liborc::createMapType(std::move(key_orc_type), std::move(item_orc_type)); } case Type::type::DENSE_UNION: @@ -1034,6 +1043,7 @@ Result> GetOrcType(const DataType& type) { for (const auto& arrow_field : arrow_fields) { std::shared_ptr arrow_child_type = arrow_field->type(); ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*arrow_child_type)); + SetAttributes(arrow_field, orc_subtype.get()); out_type->addUnionChild(std::move(orc_subtype)); } return std::move(out_type); @@ -1132,23 +1142,26 @@ Result> GetArrowType(const liborc::Type* type) { if (subtype_count != 1) { return Status::TypeError("Invalid Orc List type"); } - ARROW_ASSIGN_OR_RAISE(auto elemtype, GetArrowType(type->getSubtype(0))); - return list(std::move(elemtype)); + ARROW_ASSIGN_OR_RAISE(auto elem_field, GetArrowField("item", type->getSubtype(0))); + return list(std::move(elem_field)); } case liborc::MAP: { if (subtype_count != 2) { return Status::TypeError("Invalid Orc Map type"); } - ARROW_ASSIGN_OR_RAISE(auto key_type, GetArrowType(type->getSubtype(0))); - ARROW_ASSIGN_OR_RAISE(auto item_type, GetArrowType(type->getSubtype(1))); - return map(std::move(key_type), std::move(item_type)); + ARROW_ASSIGN_OR_RAISE( + auto key_field, GetArrowField("key", type->getSubtype(0), /*nullable=*/false)); + ARROW_ASSIGN_OR_RAISE(auto value_field, + GetArrowField("value", type->getSubtype(1))); + return std::make_shared(std::move(key_field), std::move(value_field)); } case liborc::STRUCT: { FieldVector fields(subtype_count); for (int child = 0; child < subtype_count; ++child) { - ARROW_ASSIGN_OR_RAISE(auto elem_type, GetArrowType(type->getSubtype(child))); - std::string name = type->getFieldName(child); - fields[child] = field(std::move(name), std::move(elem_type)); + const auto& name = type->getFieldName(child); + ARROW_ASSIGN_OR_RAISE(auto elem_field, + GetArrowField(name, type->getSubtype(child))); + fields[child] = std::move(elem_field); } return struct_(std::move(fields)); } @@ -1159,8 +1172,9 @@ Result> GetArrowType(const liborc::Type* type) { FieldVector fields(subtype_count); std::vector type_codes(subtype_count); for (int child = 0; child < subtype_count; ++child) { - ARROW_ASSIGN_OR_RAISE(auto elem_type, GetArrowType(type->getSubtype(child))); - fields[child] = field("_union_" + ToChars(child), std::move(elem_type)); + ARROW_ASSIGN_OR_RAISE(auto elem_field, GetArrowField("_union_" + ToChars(child), + type->getSubtype(child))); + fields[child] = std::move(elem_field); type_codes[child] = static_cast(child); } return sparse_union(std::move(fields), std::move(type_codes)); @@ -1176,11 +1190,32 @@ Result> GetOrcType(const Schema& schema) { for (int i = 0; i < numFields; i++) { const auto& field = schema.field(i); ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*field->type())); + SetAttributes(field, orc_subtype.get()); out_type->addStructField(field->name(), std::move(orc_subtype)); } return std::move(out_type); } +Result> GetFieldMetadata( + const liborc::Type* type) { + if (type == nullptr) { + return nullptr; + } + const auto keys = type->getAttributeKeys(); + auto metadata = std::make_shared(); + for (const auto& key : keys) { + metadata->Append(key, type->getAttributeValue(key)); + } + return std::const_pointer_cast(metadata); +} + +Result> GetArrowField(const std::string& name, + const liborc::Type* type, bool nullable) { + ARROW_ASSIGN_OR_RAISE(auto arrow_type, GetArrowType(type)); + ARROW_ASSIGN_OR_RAISE(auto metadata, GetFieldMetadata(type)); + return field(name, std::move(arrow_type), nullable, std::move(metadata)); +} + } // namespace orc } // namespace adapters } // namespace arrow diff --git a/cpp/src/arrow/adapters/orc/util.h b/cpp/src/arrow/adapters/orc/util.h index e112d790d76..46c67748150 100644 --- a/cpp/src/arrow/adapters/orc/util.h +++ b/cpp/src/arrow/adapters/orc/util.h @@ -23,6 +23,7 @@ #include "arrow/array/builder_base.h" #include "arrow/result.h" #include "arrow/status.h" +#include "arrow/util/key_value_metadata.h" #include "orc/OrcFile.hh" namespace liborc = orc; @@ -35,6 +36,13 @@ Result> GetArrowType(const liborc::Type* type); Result> GetOrcType(const Schema& schema); +Result> GetFieldMetadata( + const liborc::Type* type); + +Result> GetArrowField(const std::string& name, + const liborc::Type* type, + bool nullable = true); + ARROW_EXPORT Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch, int64_t offset, int64_t length, arrow::ArrayBuilder* builder); From 860851156a223eb3f0e33eb3ff6d930c79409faa Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Wed, 10 May 2023 22:51:20 +0800 Subject: [PATCH 2/3] add test for more types --- cpp/src/arrow/adapters/orc/adapter_test.cc | 31 +++++++++++++++++----- cpp/src/arrow/adapters/orc/util.cc | 1 + cpp/src/arrow/adapters/orc/util.h | 2 +- 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/adapters/orc/adapter_test.cc b/cpp/src/arrow/adapters/orc/adapter_test.cc index ba94d8d93f4..cff9a6d7f91 100644 --- a/cpp/src/arrow/adapters/orc/adapter_test.cc +++ b/cpp/src/arrow/adapters/orc/adapter_test.cc @@ -594,12 +594,31 @@ TEST(TestAdapterReadWrite, FieldAttributesRoundTrip) { buffer_output_stream.get(), write_options)); auto schema = ::arrow::schema( - {::arrow::field("c0", ::arrow::int64(), true, key_value_metadata({"k0"}, {"v0"})), - ::arrow::field("c1", ::arrow::utf8(), true, key_value_metadata({"k1"}, {"v1"})), + {::arrow::field("c0", ::arrow::int64(), /*nullable=*/true, + key_value_metadata({"k0"}, {"v0"})), + ::arrow::field("c1", ::arrow::utf8(), /*nullable=*/true, + key_value_metadata({"k1"}, {"v1"})), ::arrow::field( - "c2", ::arrow::list(::arrow::field("item", ::arrow::int64(), true, - key_value_metadata({"k0"}, {"ddv0"}))))}); - auto expected_output_table = ::arrow::TableFromJSON(schema, {R"([[1, "a", [1, 2]]])"}); + "c2", ::arrow::list(::arrow::field("item", ::arrow::int64(), /*nullable=*/true, + key_value_metadata({"k2"}, {"v2"})))), + ::arrow::field("c3", + std::make_shared( + ::arrow::field("key", ::arrow::utf8(), /*nullable=*/false, + key_value_metadata({"k3"}, {"v3"})), + ::arrow::field("value", ::arrow::int64(), /*nullable=*/true, + key_value_metadata({"k4"}, {"v4"})))), + ::arrow::field("c4", ::arrow::struct_({::arrow::field( + "sub", ::arrow::int64(), + /*nullable=*/true, key_value_metadata({"k5"}, {"v5"}))})), + ::arrow::field("c5", + ::arrow::sparse_union( + {::arrow::field("_union_0", ::arrow::int64(), /*nullable=*/true, + key_value_metadata({"k6"}, {"v6"})), + ::arrow::field("_union_1", ::arrow::utf8(), /*nullable=*/true, + key_value_metadata({"k7"}, {"v7"}))}, + {0, 1}))}); + auto expected_output_table = ::arrow::TableFromJSON( + schema, {R"([[1, "a", [1, 2], [["a", 1]], {"sub": 1}, null]])"}); ARROW_EXPECT_OK(writer->Write(*expected_output_table)); ARROW_EXPECT_OK(writer->Close()); @@ -609,7 +628,7 @@ TEST(TestAdapterReadWrite, FieldAttributesRoundTrip) { auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); EXPECT_OK_AND_ASSIGN(auto actual_output_table, reader->Read()); ASSERT_OK(actual_output_table->ValidateFull()); - AssertTablesEqual(*expected_output_table, *actual_output_table, false, false); + AssertTablesEqual(*expected_output_table, *actual_output_table); // Check schema equality with metadata. EXPECT_OK_AND_ASSIGN(auto read_schema, reader->ReadSchema()); diff --git a/cpp/src/arrow/adapters/orc/util.cc b/cpp/src/arrow/adapters/orc/util.cc index d45f0c53491..18bdf45a018 100644 --- a/cpp/src/arrow/adapters/orc/util.cc +++ b/cpp/src/arrow/adapters/orc/util.cc @@ -30,6 +30,7 @@ #include "arrow/util/bitmap_ops.h" #include "arrow/util/checked_cast.h" #include "arrow/util/decimal.h" +#include "arrow/util/key_value_metadata.h" #include "arrow/util/range.h" #include "arrow/util/string.h" #include "arrow/visit_data_inline.h" diff --git a/cpp/src/arrow/adapters/orc/util.h b/cpp/src/arrow/adapters/orc/util.h index 46c67748150..00af9f4b76e 100644 --- a/cpp/src/arrow/adapters/orc/util.h +++ b/cpp/src/arrow/adapters/orc/util.h @@ -23,7 +23,7 @@ #include "arrow/array/builder_base.h" #include "arrow/result.h" #include "arrow/status.h" -#include "arrow/util/key_value_metadata.h" +#include "arrow/type_fwd.h" #include "orc/OrcFile.hh" namespace liborc = orc; From 9ebec317d8d21815c2e893a6249b2b18eb0285d7 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Wed, 10 May 2023 23:20:59 +0800 Subject: [PATCH 3/3] fix name and type --- cpp/src/arrow/adapters/orc/util.cc | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/adapters/orc/util.cc b/cpp/src/arrow/adapters/orc/util.cc index 18bdf45a018..f4bdbae6a7b 100644 --- a/cpp/src/arrow/adapters/orc/util.cc +++ b/cpp/src/arrow/adapters/orc/util.cc @@ -955,8 +955,8 @@ Status WriteBatch(const Array& array, int64_t orc_offset, void SetAttributes(const std::shared_ptr& field, liborc::Type* type) { if (field->HasMetadata()) { const auto& metadata = field->metadata(); - for (int j = 0; j < metadata->size(); j++) { - type->setAttribute(metadata->key(j), metadata->value(j)); + for (int64_t i = 0; i < metadata->size(); i++) { + type->setAttribute(metadata->key(i), metadata->value(i)); } } } @@ -1203,6 +1203,9 @@ Result> GetFieldMetadata( return nullptr; } const auto keys = type->getAttributeKeys(); + if (keys.empty()) { + return nullptr; + } auto metadata = std::make_shared(); for (const auto& key : keys) { metadata->Append(key, type->getAttributeValue(key));