From 8a0293e95df4e902d90852e878e845ae13c044ec Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Sun, 24 Apr 2016 19:55:58 +0200 Subject: [PATCH 1/6] ARROW-92: Arrow to Parquet Schema conversion --- cpp/src/arrow/parquet/parquet-schema-test.cc | 100 +++++++++++++++++++ cpp/src/arrow/parquet/schema.cc | 99 ++++++++++++++++++ cpp/src/arrow/parquet/schema.h | 3 + 3 files changed, 202 insertions(+) diff --git a/cpp/src/arrow/parquet/parquet-schema-test.cc b/cpp/src/arrow/parquet/parquet-schema-test.cc index e2280f41189..3dcc7cc5de2 100644 --- a/cpp/src/arrow/parquet/parquet-schema-test.cc +++ b/cpp/src/arrow/parquet/parquet-schema-test.cc @@ -161,6 +161,106 @@ TEST_F(TestConvertParquetSchema, UnsupportedThings) { } } +class TestConvertArrowSchema : public ::testing::Test { + public: + virtual void SetUp() {} + + void CheckFlatSchema(const std::vector& nodes) { + NodePtr schema_node = GroupNode::Make("schema", Repetition::REPEATED, nodes); + const GroupNode* expected_schema_node = + static_cast(schema_node.get()); + const GroupNode* result_schema_node = + static_cast(result_schema_->schema().get()); + + ASSERT_EQ(expected_schema_node->field_count(), + result_schema_node->field_count()); + + for (int i = 0; i < expected_schema_node->field_count(); i++) { + auto lhs = result_schema_node->field(i); + auto rhs = expected_schema_node->field(i); + EXPECT_TRUE(lhs->Equals(rhs.get())); + } + } + + Status ConvertSchema(const std::vector>& fields) { + arrow_schema_ = std::make_shared(fields); + return ToParquetSchema(arrow_schema_.get(), &result_schema_); + } + + protected: + std::shared_ptr arrow_schema_; + std::shared_ptr<::parquet::SchemaDescriptor> result_schema_; +}; + +TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) { + std::vector parquet_fields; + std::vector> arrow_fields; + + parquet_fields.push_back( + PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN)); + arrow_fields.push_back(std::make_shared("boolean", BOOL, false)); + + parquet_fields.push_back( + PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32)); + arrow_fields.push_back(std::make_shared("int32", INT32, false)); + + parquet_fields.push_back( + PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64)); + arrow_fields.push_back(std::make_shared("int64", INT64, false)); + + parquet_fields.push_back( + PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT)); + arrow_fields.push_back(std::make_shared("float", FLOAT)); + + parquet_fields.push_back( + PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE)); + arrow_fields.push_back(std::make_shared("double", DOUBLE)); + + // TODO: String types need to be clarified a bit more in the Arrow spec + // parquet_fields.push_back(PrimitiveNode::Make( + // "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8)); + // arrow_fields.push_back(std::make_shared("string", UTF8)); + + // TODO: At the moment we have not enough information in the BINARY type in Arrow + // parquet_fields.push_back( + // PrimitiveNode::Make("binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY)); + // arrow_fields.push_back(std::make_shared("binary", BINARY)); + + // parquet_fields.push_back(PrimitiveNode::Make("flba-binary", Repetition::OPTIONAL, + // ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE, 12)); + // arrow_fields.push_back(std::make_shared("flba-binary", BINARY)); + + ASSERT_OK(ConvertSchema(arrow_fields)); + + CheckFlatSchema(parquet_fields); +} + +TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) { + std::vector parquet_fields; + std::vector> arrow_fields; + + /*parquet_fields.push_back(PrimitiveNode::Make("flba-decimal", Repetition::OPTIONAL, + ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 4, 8, 4)); + arrow_fields.push_back(std::make_shared("flba-decimal", DECIMAL_8_4)); + + parquet_fields.push_back(PrimitiveNode::Make("binary-decimal", Repetition::OPTIONAL, + ParquetType::BYTE_ARRAY, LogicalType::DECIMAL, -1, 8, 4)); + arrow_fields.push_back(std::make_shared("binary-decimal", DECIMAL_8_4)); + + parquet_fields.push_back(PrimitiveNode::Make("int32-decimal", Repetition::OPTIONAL, + ParquetType::INT32, LogicalType::DECIMAL, -1, 8, 4)); + arrow_fields.push_back(std::make_shared("int32-decimal", DECIMAL_8_4)); + + parquet_fields.push_back(PrimitiveNode::Make("int64-decimal", Repetition::OPTIONAL, + ParquetType::INT64, LogicalType::DECIMAL, -1, 8, 4)); + arrow_fields.push_back(std::make_shared("int64-decimal", DECIMAL_8_4)); + + CheckFlatSchema(arrow_schema);*/ + ASSERT_OK(ConvertSchema(arrow_fields)); + + CheckFlatSchema(parquet_fields); +} + TEST(TestNodeConversion, DateAndTime) {} } // namespace parquet diff --git a/cpp/src/arrow/parquet/schema.cc b/cpp/src/arrow/parquet/schema.cc index 560e2837406..0310ff4bc6c 100644 --- a/cpp/src/arrow/parquet/schema.cc +++ b/cpp/src/arrow/parquet/schema.cc @@ -24,6 +24,7 @@ #include "arrow/types/decimal.h" #include "arrow/util/status.h" +using parquet::Repetition; using parquet::schema::Node; using parquet::schema::NodePtr; using parquet::schema::GroupNode; @@ -182,6 +183,104 @@ Status FromParquetSchema( return Status::OK(); } +Status FieldToNode(const std::shared_ptr& field, NodePtr* out) { + LogicalType::type logical_type = LogicalType::NONE; + ParquetType::type type; + Repetition::type repetition = Repetition::REQUIRED; + if (field->nullable) { + repetition = Repetition::OPTIONAL; + } + + switch (field->type->type) { + // TODO: + // case Type::NA: + // break; + case Type::BOOL: + type = ParquetType::BOOLEAN; + break; + case Type::UINT8: + type = ParquetType::INT32; + logical_type = LogicalType::UINT_8; + break; + case Type::INT8: + type = ParquetType::INT32; + logical_type = LogicalType::INT_8; + break; + case Type::UINT16: + type = ParquetType::INT32; + logical_type = LogicalType::UINT_16; + break; + case Type::INT16: + type = ParquetType::INT32; + logical_type = LogicalType::INT_16; + break; + case Type::UINT32: + type = ParquetType::INT32; + logical_type = LogicalType::UINT_32; + break; + case Type::INT32: + type = ParquetType::INT32; + break; + case Type::UINT64: + type = ParquetType::INT64; + logical_type = LogicalType::UINT_64; + break; + case Type::INT64: + type = ParquetType::INT64; + break; + case Type::FLOAT: + type = ParquetType::FLOAT; + break; + case Type::DOUBLE: + type = ParquetType::DOUBLE; + break; + // // CHAR(N): fixed-length UTF8 string with length N + // CHAR = 12, + // // UTF8 variable-length string as List + // STRING = 13, + // // VARCHAR(N): Null-terminated string type embedded in a CHAR(N + 1) + // VARCHAR = 14, + + // // Variable-length bytes (no guarantee of UTF8-ness) + // BINARY = 15, + // By default, int32 days since the UNIX epoch + // DATE = 16, + // Exact timestamp encoded with int64 since UNIX epoch + // Default unit millisecond + // TIMESTAMP = 17, + // Timestamp as double seconds since the UNIX epoch + // TIMESTAMP_DOUBLE = 18, + // Exact time encoded with int64, default unit millisecond + // TIME = 19, + // Precision- and scale-based decimal type. Storage type depends on the + // parameters. + // DECIMAL = 20, + // Decimal value encoded as a text string + // DECIMAL_TEXT = 21, + default: + // TODO: LIST, STRUCT, DENSE_UNION, SPARE_UNION, JSON_SCALAR + return Status::NotImplemented("unhandled type"); + } + // TODO: handle required, repeated + *out = PrimitiveNode::Make(field->name, repetition, type, logical_type); + return Status::OK(); +} + +Status ToParquetSchema( + const Schema* arrow_schema, std::shared_ptr<::parquet::SchemaDescriptor>* out) { + std::vector nodes(arrow_schema->num_fields()); + for (int i = 0; i < arrow_schema->num_fields(); i++) { + RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), &nodes[i])); + } + + NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes); + *out = std::make_shared<::parquet::SchemaDescriptor>(); + (*out)->Init(schema); + + return Status::OK(); +} + + } // namespace parquet } // namespace arrow diff --git a/cpp/src/arrow/parquet/schema.h b/cpp/src/arrow/parquet/schema.h index a44a9a4b6a8..12a240364c9 100644 --- a/cpp/src/arrow/parquet/schema.h +++ b/cpp/src/arrow/parquet/schema.h @@ -36,6 +36,9 @@ Status NodeToField(const ::parquet::schema::NodePtr& node, std::shared_ptr* out); +Status ToParquetSchema( + const Schema* arrow_schema, std::shared_ptr<::parquet::SchemaDescriptor>* out); + } // namespace parquet } // namespace arrow From 9a6c8766e9fe6dc23cb14b6ff07b86792586cbd2 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Sun, 1 May 2016 10:44:35 +0200 Subject: [PATCH 2/6] Add more types --- cpp/src/arrow/parquet/parquet-schema-test.cc | 35 ++--------- cpp/src/arrow/parquet/schema.cc | 62 +++++++++++--------- 2 files changed, 40 insertions(+), 57 deletions(-) diff --git a/cpp/src/arrow/parquet/parquet-schema-test.cc b/cpp/src/arrow/parquet/parquet-schema-test.cc index 3dcc7cc5de2..88bb1d78571 100644 --- a/cpp/src/arrow/parquet/parquet-schema-test.cc +++ b/cpp/src/arrow/parquet/parquet-schema-test.cc @@ -172,8 +172,7 @@ class TestConvertArrowSchema : public ::testing::Test { const GroupNode* result_schema_node = static_cast(result_schema_->schema().get()); - ASSERT_EQ(expected_schema_node->field_count(), - result_schema_node->field_count()); + ASSERT_EQ(expected_schema_node->field_count(), result_schema_node->field_count()); for (int i = 0; i < expected_schema_node->field_count(); i++) { auto lhs = result_schema_node->field(i); @@ -217,18 +216,9 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) { arrow_fields.push_back(std::make_shared("double", DOUBLE)); // TODO: String types need to be clarified a bit more in the Arrow spec - // parquet_fields.push_back(PrimitiveNode::Make( - // "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8)); - // arrow_fields.push_back(std::make_shared("string", UTF8)); - - // TODO: At the moment we have not enough information in the BINARY type in Arrow - // parquet_fields.push_back( - // PrimitiveNode::Make("binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY)); - // arrow_fields.push_back(std::make_shared("binary", BINARY)); - - // parquet_fields.push_back(PrimitiveNode::Make("flba-binary", Repetition::OPTIONAL, - // ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE, 12)); - // arrow_fields.push_back(std::make_shared("flba-binary", BINARY)); + parquet_fields.push_back(PrimitiveNode::Make( + "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8)); + arrow_fields.push_back(std::make_shared("string", UTF8)); ASSERT_OK(ConvertSchema(arrow_fields)); @@ -239,23 +229,8 @@ TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) { std::vector parquet_fields; std::vector> arrow_fields; - /*parquet_fields.push_back(PrimitiveNode::Make("flba-decimal", Repetition::OPTIONAL, - ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 4, 8, 4)); - arrow_fields.push_back(std::make_shared("flba-decimal", DECIMAL_8_4)); - - parquet_fields.push_back(PrimitiveNode::Make("binary-decimal", Repetition::OPTIONAL, - ParquetType::BYTE_ARRAY, LogicalType::DECIMAL, -1, 8, 4)); - arrow_fields.push_back(std::make_shared("binary-decimal", DECIMAL_8_4)); - - parquet_fields.push_back(PrimitiveNode::Make("int32-decimal", Repetition::OPTIONAL, - ParquetType::INT32, LogicalType::DECIMAL, -1, 8, 4)); - arrow_fields.push_back(std::make_shared("int32-decimal", DECIMAL_8_4)); - - parquet_fields.push_back(PrimitiveNode::Make("int64-decimal", Repetition::OPTIONAL, - ParquetType::INT64, LogicalType::DECIMAL, -1, 8, 4)); - arrow_fields.push_back(std::make_shared("int64-decimal", DECIMAL_8_4)); + // TODO: Test Decimal Arrow -> Parquet conversion - CheckFlatSchema(arrow_schema);*/ ASSERT_OK(ConvertSchema(arrow_fields)); CheckFlatSchema(parquet_fields); diff --git a/cpp/src/arrow/parquet/schema.cc b/cpp/src/arrow/parquet/schema.cc index 0310ff4bc6c..f921b055999 100644 --- a/cpp/src/arrow/parquet/schema.cc +++ b/cpp/src/arrow/parquet/schema.cc @@ -22,6 +22,7 @@ #include "parquet/api/schema.h" #include "arrow/types/decimal.h" +#include "arrow/types/string.h" #include "arrow/util/status.h" using parquet::Repetition; @@ -187,9 +188,8 @@ Status FieldToNode(const std::shared_ptr& field, NodePtr* out) { LogicalType::type logical_type = LogicalType::NONE; ParquetType::type type; Repetition::type repetition = Repetition::REQUIRED; - if (field->nullable) { - repetition = Repetition::OPTIONAL; - } + if (field->nullable) { repetition = Repetition::OPTIONAL; } + int length = -1; switch (field->type->type) { // TODO: @@ -234,35 +234,44 @@ Status FieldToNode(const std::shared_ptr& field, NodePtr* out) { case Type::DOUBLE: type = ParquetType::DOUBLE; break; - // // CHAR(N): fixed-length UTF8 string with length N - // CHAR = 12, - // // UTF8 variable-length string as List - // STRING = 13, - // // VARCHAR(N): Null-terminated string type embedded in a CHAR(N + 1) - // VARCHAR = 14, - - // // Variable-length bytes (no guarantee of UTF8-ness) - // BINARY = 15, - // By default, int32 days since the UNIX epoch - // DATE = 16, - // Exact timestamp encoded with int64 since UNIX epoch - // Default unit millisecond - // TIMESTAMP = 17, - // Timestamp as double seconds since the UNIX epoch - // TIMESTAMP_DOUBLE = 18, - // Exact time encoded with int64, default unit millisecond - // TIME = 19, + case Type::CHAR: + type = ParquetType::FIXED_LEN_BYTE_ARRAY; + logical_type = LogicalType::UTF8; + length = static_cast(field->type.get())->size; + break; + case Type::STRING: + type = ParquetType::BYTE_ARRAY; + logical_type = LogicalType::UTF8; + break; + case Type::BINARY: + type = ParquetType::BYTE_ARRAY; + break; + case Type::DATE: + type = ParquetType::INT32; + logical_type = LogicalType::DATE; + break; + case Type::TIMESTAMP: + type = ParquetType::INT64; + logical_type = LogicalType::TIMESTAMP_MILLIS; + break; + case Type::TIMESTAMP_DOUBLE: + type = ParquetType::INT64; + // This is specified as seconds since the UNIX epoch + // TODO: Converted type in Parquet? + // logical_type = LogicalType::TIMESTAMP_MILLIS; + break; + case Type::TIME: + type = ParquetType::INT64; + logical_type = LogicalType::TIME_MILLIS; // Precision- and scale-based decimal type. Storage type depends on the // parameters. // DECIMAL = 20, - // Decimal value encoded as a text string - // DECIMAL_TEXT = 21, default: - // TODO: LIST, STRUCT, DENSE_UNION, SPARE_UNION, JSON_SCALAR + // TODO: LIST, STRUCT, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL_TEXT, VARCHAR return Status::NotImplemented("unhandled type"); } - // TODO: handle required, repeated - *out = PrimitiveNode::Make(field->name, repetition, type, logical_type); + // TODO: handle repeated + *out = PrimitiveNode::Make(field->name, repetition, type, logical_type, length); return Status::OK(); } @@ -280,7 +289,6 @@ Status ToParquetSchema( return Status::OK(); } - } // namespace parquet } // namespace arrow From 38e68e51dcac0302a15d23804ba692b24ae25e35 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Sun, 1 May 2016 10:45:04 +0200 Subject: [PATCH 3/6] make format --- cpp/src/arrow/parquet/parquet-schema-test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/parquet/parquet-schema-test.cc b/cpp/src/arrow/parquet/parquet-schema-test.cc index 88bb1d78571..8de739491b5 100644 --- a/cpp/src/arrow/parquet/parquet-schema-test.cc +++ b/cpp/src/arrow/parquet/parquet-schema-test.cc @@ -217,7 +217,7 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) { // TODO: String types need to be clarified a bit more in the Arrow spec parquet_fields.push_back(PrimitiveNode::Make( - "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8)); + "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8)); arrow_fields.push_back(std::make_shared("string", UTF8)); ASSERT_OK(ConvertSchema(arrow_fields)); From 42ed0eae447df098084e71e4ca42e8a86d3a4926 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Sun, 1 May 2016 11:02:11 +0200 Subject: [PATCH 4/6] Add struct conversion --- cpp/src/arrow/parquet/schema.cc | 25 ++++++++++++++++++++----- cpp/src/arrow/parquet/schema.h | 2 ++ 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/parquet/schema.cc b/cpp/src/arrow/parquet/schema.cc index f921b055999..f807029ec7e 100644 --- a/cpp/src/arrow/parquet/schema.cc +++ b/cpp/src/arrow/parquet/schema.cc @@ -184,6 +184,20 @@ Status FromParquetSchema( return Status::OK(); } +Status StructToNode(const std::shared_ptr& type, const std::string& name, + bool nullable, NodePtr* out) { + Repetition::type repetition = Repetition::REQUIRED; + if (nullable) { repetition = Repetition::OPTIONAL; } + + std::vector children(type->num_children()); + for (int i = 0; i < type->num_children(); i++) { + RETURN_NOT_OK(FieldToNode(type->child(i), &children[i])); + } + + *out = GroupNode::Make(name, repetition, children); + return Status::OK(); +} + Status FieldToNode(const std::shared_ptr& field, NodePtr* out) { LogicalType::type logical_type = LogicalType::NONE; ParquetType::type type; @@ -263,14 +277,15 @@ Status FieldToNode(const std::shared_ptr& field, NodePtr* out) { case Type::TIME: type = ParquetType::INT64; logical_type = LogicalType::TIME_MILLIS; - // Precision- and scale-based decimal type. Storage type depends on the - // parameters. - // DECIMAL = 20, + break; + case Type::STRUCT: { + auto struct_type = std::static_pointer_cast(field->type); + return StructToNode(struct_type, field->name, field->nullable, out); + } break; default: - // TODO: LIST, STRUCT, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL_TEXT, VARCHAR + // TODO: LIST, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR return Status::NotImplemented("unhandled type"); } - // TODO: handle repeated *out = PrimitiveNode::Make(field->name, repetition, type, logical_type, length); return Status::OK(); } diff --git a/cpp/src/arrow/parquet/schema.h b/cpp/src/arrow/parquet/schema.h index 12a240364c9..bfc7d211381 100644 --- a/cpp/src/arrow/parquet/schema.h +++ b/cpp/src/arrow/parquet/schema.h @@ -36,6 +36,8 @@ Status NodeToField(const ::parquet::schema::NodePtr& node, std::shared_ptr* out); +Status FieldToNode(const std::shared_ptr& field, ::parquet::schema::NodePtr* out); + Status ToParquetSchema( const Schema* arrow_schema, std::shared_ptr<::parquet::SchemaDescriptor>* out); From 9c5b085b606134b01944196ec16649e75a5ac845 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Sun, 1 May 2016 11:29:23 +0200 Subject: [PATCH 5/6] Include string --- cpp/src/arrow/parquet/schema.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/parquet/schema.cc b/cpp/src/arrow/parquet/schema.cc index f807029ec7e..db0d8b8c924 100644 --- a/cpp/src/arrow/parquet/schema.cc +++ b/cpp/src/arrow/parquet/schema.cc @@ -17,6 +17,7 @@ #include "arrow/parquet/schema.h" +#include #include #include "parquet/api/schema.h" From e3aa261756f2b67657e6065d165e46f0ba2257ec Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Sun, 1 May 2016 19:06:47 +0200 Subject: [PATCH 6/6] Add macro to convert ParquetException to Status --- cpp/src/arrow/parquet/schema.cc | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/parquet/schema.cc b/cpp/src/arrow/parquet/schema.cc index db0d8b8c924..214c764f08b 100644 --- a/cpp/src/arrow/parquet/schema.cc +++ b/cpp/src/arrow/parquet/schema.cc @@ -21,11 +21,13 @@ #include #include "parquet/api/schema.h" +#include "parquet/exception.h" #include "arrow/types/decimal.h" #include "arrow/types/string.h" #include "arrow/util/status.h" +using parquet::ParquetException; using parquet::Repetition; using parquet::schema::Node; using parquet::schema::NodePtr; @@ -39,6 +41,11 @@ namespace arrow { namespace parquet { +#define PARQUET_CATCH_NOT_OK(s) \ + try { \ + (s); \ + } catch (const ParquetException& e) { return Status::Invalid(e.what()); } + const auto BOOL = std::make_shared(); const auto UINT8 = std::make_shared(); const auto INT32 = std::make_shared(); @@ -300,7 +307,7 @@ Status ToParquetSchema( NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes); *out = std::make_shared<::parquet::SchemaDescriptor>(); - (*out)->Init(schema); + PARQUET_CATCH_NOT_OK((*out)->Init(schema)); return Status::OK(); }