diff --git a/c_glib/parquet-glib/arrow-file-reader.cpp b/c_glib/parquet-glib/arrow-file-reader.cpp index 217bd190d51..db59436b4b6 100644 --- a/c_glib/parquet-glib/arrow-file-reader.cpp +++ b/c_glib/parquet-glib/arrow-file-reader.cpp @@ -231,15 +231,8 @@ gparquet_arrow_file_reader_get_schema(GParquetArrowFileReader *reader, { auto parquet_arrow_file_reader = gparquet_arrow_file_reader_get_raw(reader); - const auto n_columns = - parquet_arrow_file_reader->parquet_reader()->metadata()->num_columns(); - std::vector indices(n_columns); - for (int i = 0; i < n_columns; ++i) { - indices[i] = i; - } - std::shared_ptr arrow_schema; - auto status = parquet_arrow_file_reader->GetSchema(indices, &arrow_schema); + auto status = parquet_arrow_file_reader->GetSchema(&arrow_schema); if (garrow_error_check(error, status, "[parquet][arrow][file-reader][get-schema]")) { @@ -249,42 +242,6 @@ gparquet_arrow_file_reader_get_schema(GParquetArrowFileReader *reader, } } -/** - * gparquet_arrow_file_reader_select_schema: - * @reader: A #GParquetArrowFileReader. - * @column_indexes: (array length=n_column_indexes): - * The array of column indexes to be selected. - * @n_column_indexes: The length of `column_indexes`. - * @error: (nullable): Return locatipcn for a #GError or %NULL. - * - * Returns: (transfer full) (nullable): A selected #GArrowSchema. - * - * Since: 0.12.0 - */ -GArrowSchema * -gparquet_arrow_file_reader_select_schema(GParquetArrowFileReader *reader, - gint *column_indexes, - gsize n_column_indexes, - GError **error) -{ - auto parquet_arrow_file_reader = gparquet_arrow_file_reader_get_raw(reader); - - std::vector indices(n_column_indexes); - for (gsize i = 0; i < n_column_indexes; ++i) { - indices[i] = column_indexes[i]; - } - - std::shared_ptr arrow_schema; - auto status = parquet_arrow_file_reader->GetSchema(indices, &arrow_schema); - if (garrow_error_check(error, - status, - "[parquet][arrow][file-reader][select-schema]")) { - return garrow_schema_new_raw(&arrow_schema); - } else { - return NULL; - } -} - /** * gparquet_arrow_file_reader_read_column_data: * @reader: A #GParquetArrowFileReader. diff --git a/c_glib/parquet-glib/arrow-file-reader.h b/c_glib/parquet-glib/arrow-file-reader.h index a0d1a8eca88..5a6ec962436 100644 --- a/c_glib/parquet-glib/arrow-file-reader.h +++ b/c_glib/parquet-glib/arrow-file-reader.h @@ -48,11 +48,6 @@ gparquet_arrow_file_reader_read_table(GParquetArrowFileReader *reader, GArrowSchema * gparquet_arrow_file_reader_get_schema(GParquetArrowFileReader *reader, GError **error); -GArrowSchema * -gparquet_arrow_file_reader_select_schema(GParquetArrowFileReader *reader, - gint *column_indexes, - gsize n_column_indexes, - GError **error); GARROW_AVAILABLE_IN_1_0 GArrowChunkedArray * diff --git a/c_glib/test/parquet/test-arrow-file-reader.rb b/c_glib/test/parquet/test-arrow-file-reader.rb index 7ff17c2ba11..d30c8e950d3 100644 --- a/c_glib/test/parquet/test-arrow-file-reader.rb +++ b/c_glib/test/parquet/test-arrow-file-reader.rb @@ -39,19 +39,6 @@ def test_schema SCHEMA end - def test_select_schema - assert_equal(<<-SCHEMA.chomp, @reader.select_schema([0]).to_s) -a: string - SCHEMA - assert_equal(<<-SCHEMA.chomp, @reader.select_schema([1]).to_s) -b: int32 - SCHEMA - assert_equal(<<-SCHEMA.chomp, @reader.select_schema([0, 1]).to_s) -a: string -b: int32 - SCHEMA - end - def test_read_column assert_equal([ Arrow::ChunkedArray.new([@a_array]), diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc index d0d2536b8d1..2e57ab8cd44 100644 --- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc @@ -40,6 +40,7 @@ #include "parquet/api/writer.h" #include "parquet/arrow/reader.h" +#include "parquet/arrow/reader_internal.h" #include "parquet/arrow/schema.h" #include "parquet/arrow/test-util.h" #include "parquet/arrow/writer.h" @@ -597,12 +598,16 @@ class TestParquetIO : public ::testing::Test { std::shared_ptr<::arrow::Schema> arrow_schema; ArrowReaderProperties props; ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema)); - FileWriter writer(::arrow::default_memory_pool(), MakeWriter(schema), arrow_schema); - ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length())); - ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*values)); - ASSERT_OK_NO_THROW(writer.Close()); - // writer.Close() should be idempotent - ASSERT_OK_NO_THROW(writer.Close()); + + std::unique_ptr writer; + ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(), + MakeWriter(schema), arrow_schema, + default_arrow_writer_properties(), &writer)); + ASSERT_OK_NO_THROW(writer->NewRowGroup(values->length())); + ASSERT_OK_NO_THROW(writer->WriteColumnChunk(*values)); + ASSERT_OK_NO_THROW(writer->Close()); + // writer->Close() should be idempotent + ASSERT_OK_NO_THROW(writer->Close()); } void ResetSink() { sink_ = CreateOutputStream(); } @@ -789,13 +794,17 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { std::shared_ptr<::arrow::Schema> arrow_schema; ArrowReaderProperties props; ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema)); - FileWriter writer(default_memory_pool(), this->MakeWriter(schema), arrow_schema); + + std::unique_ptr writer; + ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(), + this->MakeWriter(schema), arrow_schema, + default_arrow_writer_properties(), &writer)); for (int i = 0; i < 4; i++) { - ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); + ASSERT_OK_NO_THROW(writer->NewRowGroup(chunk_size)); std::shared_ptr sliced_array = values->Slice(i * chunk_size, chunk_size); - ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*sliced_array)); + ASSERT_OK_NO_THROW(writer->WriteColumnChunk(*sliced_array)); } - ASSERT_OK_NO_THROW(writer.Close()); + ASSERT_OK_NO_THROW(writer->Close()); ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values)); } @@ -859,14 +868,17 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { std::shared_ptr<::arrow::Schema> arrow_schema; ArrowReaderProperties props; ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema)); - FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema), - arrow_schema); + + std::unique_ptr writer; + ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(), + this->MakeWriter(schema), arrow_schema, + default_arrow_writer_properties(), &writer)); for (int i = 0; i < 4; i++) { - ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); + ASSERT_OK_NO_THROW(writer->NewRowGroup(chunk_size)); std::shared_ptr sliced_array = values->Slice(i * chunk_size, chunk_size); - ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*sliced_array)); + ASSERT_OK_NO_THROW(writer->WriteColumnChunk(*sliced_array)); } - ASSERT_OK_NO_THROW(writer.Close()); + ASSERT_OK_NO_THROW(writer->Close()); ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values)); } @@ -2624,11 +2636,15 @@ TEST(TestArrowReaderAdHoc, DISABLED_LargeStringColumn) { GroupNode::Make("schema", Repetition::REQUIRED, {schm->group_node()->field(0)})); auto writer = ParquetFileWriter::Open(sink, schm_node); - FileWriter arrow_writer(default_memory_pool(), std::move(writer), table->schema()); + + std::unique_ptr arrow_writer; + ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(), std::move(writer), + table->schema(), default_arrow_writer_properties(), + &arrow_writer)); for (int i : {0, 1}) { - ASSERT_OK_NO_THROW(arrow_writer.WriteTable(*table, table->num_rows())) << i; + ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table, table->num_rows())) << i; } - ASSERT_OK_NO_THROW(arrow_writer.Close()); + ASSERT_OK_NO_THROW(arrow_writer->Close()); std::shared_ptr tables_buffer; ASSERT_OK_NO_THROW(sink->Finish(&tables_buffer)); diff --git a/cpp/src/parquet/arrow/arrow-schema-test.cc b/cpp/src/parquet/arrow/arrow-schema-test.cc index 06646b4cb81..b9a8e81f6d1 100644 --- a/cpp/src/parquet/arrow/arrow-schema-test.cc +++ b/cpp/src/parquet/arrow/arrow-schema-test.cc @@ -21,6 +21,7 @@ #include "gtest/gtest.h" #include "parquet/arrow/reader.h" +#include "parquet/arrow/reader_internal.h" #include "parquet/arrow/schema.h" #include "parquet/file_reader.h" #include "parquet/schema.h" @@ -65,35 +66,21 @@ class TestConvertParquetSchema : public ::testing::Test { void CheckFlatSchema(const std::shared_ptr<::arrow::Schema>& expected_schema) { ASSERT_EQ(expected_schema->num_fields(), result_schema_->num_fields()); for (int i = 0; i < expected_schema->num_fields(); ++i) { - auto lhs = result_schema_->field(i); - auto rhs = expected_schema->field(i); - EXPECT_TRUE(lhs->Equals(rhs)) - << i << " " << lhs->ToString() << " != " << rhs->ToString(); + auto result_field = result_schema_->field(i); + auto expected_field = expected_schema->field(i); + EXPECT_TRUE(result_field->Equals(expected_field)) + << "Field " << i << "\n result: " << result_field->ToString() + << "\n expected: " << expected_field->ToString(); } } - ::arrow::Status ConvertSchema(const std::vector& nodes) { - NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes); - descr_.Init(schema); - ArrowReaderProperties props; - return FromParquetSchema(&descr_, props, &result_schema_); - } - - ::arrow::Status ConvertSchema(const std::vector& nodes, - const std::vector& column_indices) { - NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes); - descr_.Init(schema); - ArrowReaderProperties props; - return FromParquetSchema(&descr_, column_indices, props, &result_schema_); - } - ::arrow::Status ConvertSchema( const std::vector& nodes, - const std::shared_ptr& key_value_metadata) { + const std::shared_ptr& key_value_metadata = nullptr) { NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes); descr_.Init(schema); ArrowReaderProperties props; - return FromParquetSchema(&descr_, {}, props, key_value_metadata, &result_schema_); + return FromParquetSchema(&descr_, props, key_value_metadata, &result_schema_); } protected: @@ -107,69 +94,68 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) { parquet_fields.push_back( PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN)); - arrow_fields.push_back(std::make_shared("boolean", BOOL, false)); + arrow_fields.push_back(::arrow::field("boolean", BOOL, false)); parquet_fields.push_back( PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32)); - arrow_fields.push_back(std::make_shared("int32", INT32, false)); + arrow_fields.push_back(::arrow::field("int32", INT32, false)); parquet_fields.push_back( PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64)); - arrow_fields.push_back(std::make_shared("int64", INT64, false)); + arrow_fields.push_back(::arrow::field("int64", INT64, false)); parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, ParquetType::INT64, ConvertedType::TIMESTAMP_MILLIS)); arrow_fields.push_back( - std::make_shared("timestamp", ::arrow::timestamp(TimeUnit::MILLI), false)); + ::arrow::field("timestamp", ::arrow::timestamp(TimeUnit::MILLI), false)); parquet_fields.push_back(PrimitiveNode::Make("timestamp[us]", Repetition::REQUIRED, ParquetType::INT64, ConvertedType::TIMESTAMP_MICROS)); - arrow_fields.push_back(std::make_shared( - "timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO), false)); + arrow_fields.push_back( + ::arrow::field("timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO), false)); parquet_fields.push_back(PrimitiveNode::Make("date", Repetition::REQUIRED, ParquetType::INT32, ConvertedType::DATE)); - arrow_fields.push_back(std::make_shared("date", ::arrow::date32(), false)); + arrow_fields.push_back(::arrow::field("date", ::arrow::date32(), false)); parquet_fields.push_back(PrimitiveNode::Make( "time32", Repetition::REQUIRED, ParquetType::INT32, ConvertedType::TIME_MILLIS)); arrow_fields.push_back( - std::make_shared("time32", ::arrow::time32(TimeUnit::MILLI), false)); + ::arrow::field("time32", ::arrow::time32(TimeUnit::MILLI), false)); parquet_fields.push_back(PrimitiveNode::Make( "time64", Repetition::REQUIRED, ParquetType::INT64, ConvertedType::TIME_MICROS)); arrow_fields.push_back( - std::make_shared("time64", ::arrow::time64(TimeUnit::MICRO), false)); + ::arrow::field("time64", ::arrow::time64(TimeUnit::MICRO), false)); parquet_fields.push_back( PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96)); - arrow_fields.push_back(std::make_shared("timestamp96", TIMESTAMP_NS, false)); + arrow_fields.push_back(::arrow::field("timestamp96", TIMESTAMP_NS, false)); parquet_fields.push_back( PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT)); - arrow_fields.push_back(std::make_shared("float", FLOAT)); + arrow_fields.push_back(::arrow::field("float", FLOAT)); parquet_fields.push_back( PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE)); - arrow_fields.push_back(std::make_shared("double", DOUBLE)); + arrow_fields.push_back(::arrow::field("double", DOUBLE)); parquet_fields.push_back( PrimitiveNode::Make("binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY)); - arrow_fields.push_back(std::make_shared("binary", BINARY)); + arrow_fields.push_back(::arrow::field("binary", BINARY)); parquet_fields.push_back(PrimitiveNode::Make( "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::UTF8)); - arrow_fields.push_back(std::make_shared("string", UTF8)); + arrow_fields.push_back(::arrow::field("string", UTF8)); parquet_fields.push_back(PrimitiveNode::Make("flba-binary", Repetition::OPTIONAL, ParquetType::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE, 12)); - arrow_fields.push_back( - std::make_shared("flba-binary", ::arrow::fixed_size_binary(12))); + arrow_fields.push_back(::arrow::field("flba-binary", ::arrow::fixed_size_binary(12))); - auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); + auto arrow_schema = ::arrow::schema(arrow_fields); ASSERT_OK(ConvertSchema(parquet_fields)); ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); @@ -261,11 +247,11 @@ TEST_F(TestConvertParquetSchema, ParquetAnnotatedFields) { parquet_fields.push_back(PrimitiveNode::Make(c.name, Repetition::OPTIONAL, c.logical_type, c.physical_type, c.physical_length)); - arrow_fields.push_back(std::make_shared(c.name, c.datatype)); + arrow_fields.push_back(::arrow::field(c.name, c.datatype)); } ASSERT_OK(ConvertSchema(parquet_fields)); - auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); + auto arrow_schema = ::arrow::schema(arrow_fields); ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); } @@ -275,26 +261,15 @@ TEST_F(TestConvertParquetSchema, DuplicateFieldNames) { parquet_fields.push_back( PrimitiveNode::Make("xxx", Repetition::REQUIRED, ParquetType::BOOLEAN)); - auto arrow_field1 = std::make_shared("xxx", BOOL, false); + auto arrow_field1 = ::arrow::field("xxx", BOOL, false); parquet_fields.push_back( PrimitiveNode::Make("xxx", Repetition::REQUIRED, ParquetType::INT32)); - auto arrow_field2 = std::make_shared("xxx", INT32, false); + auto arrow_field2 = ::arrow::field("xxx", INT32, false); ASSERT_OK(ConvertSchema(parquet_fields)); arrow_fields = {arrow_field1, arrow_field2}; - ASSERT_NO_FATAL_FAILURE( - CheckFlatSchema(std::make_shared<::arrow::Schema>(arrow_fields))); - - ASSERT_OK(ConvertSchema(parquet_fields, std::vector({0, 1}))); - arrow_fields = {arrow_field1, arrow_field2}; - ASSERT_NO_FATAL_FAILURE( - CheckFlatSchema(std::make_shared<::arrow::Schema>(arrow_fields))); - - ASSERT_OK(ConvertSchema(parquet_fields, std::vector({1, 0}))); - arrow_fields = {arrow_field2, arrow_field1}; - ASSERT_NO_FATAL_FAILURE( - CheckFlatSchema(std::make_shared<::arrow::Schema>(arrow_fields))); + ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(::arrow::schema(arrow_fields))); } TEST_F(TestConvertParquetSchema, ParquetKeyValueMetadata) { @@ -303,11 +278,11 @@ TEST_F(TestConvertParquetSchema, ParquetKeyValueMetadata) { parquet_fields.push_back( PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN)); - arrow_fields.push_back(std::make_shared("boolean", BOOL, false)); + arrow_fields.push_back(::arrow::field("boolean", BOOL, false)); parquet_fields.push_back( PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32)); - arrow_fields.push_back(std::make_shared("int32", INT32, false)); + arrow_fields.push_back(::arrow::field("int32", INT32, false)); auto key_value_metadata = std::make_shared(); key_value_metadata->Append("foo", "bar"); @@ -327,7 +302,7 @@ TEST_F(TestConvertParquetSchema, ParquetEmptyKeyValueMetadata) { parquet_fields.push_back( PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32)); - arrow_fields.push_back(std::make_shared("int32", INT32, false)); + arrow_fields.push_back(::arrow::field("int32", INT32, false)); std::shared_ptr key_value_metadata = nullptr; ASSERT_OK(ConvertSchema(parquet_fields, key_value_metadata)); @@ -343,24 +318,24 @@ TEST_F(TestConvertParquetSchema, ParquetFlatDecimals) { parquet_fields.push_back(PrimitiveNode::Make("flba-decimal", Repetition::OPTIONAL, ParquetType::FIXED_LEN_BYTE_ARRAY, ConvertedType::DECIMAL, 4, 8, 4)); - arrow_fields.push_back(std::make_shared("flba-decimal", DECIMAL_8_4)); + arrow_fields.push_back(::arrow::field("flba-decimal", DECIMAL_8_4)); parquet_fields.push_back(PrimitiveNode::Make("binary-decimal", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::DECIMAL, -1, 8, 4)); - arrow_fields.push_back(std::make_shared("binary-decimal", DECIMAL_8_4)); + arrow_fields.push_back(::arrow::field("binary-decimal", DECIMAL_8_4)); parquet_fields.push_back(PrimitiveNode::Make("int32-decimal", Repetition::OPTIONAL, ParquetType::INT32, ConvertedType::DECIMAL, -1, 8, 4)); - arrow_fields.push_back(std::make_shared("int32-decimal", DECIMAL_8_4)); + arrow_fields.push_back(::arrow::field("int32-decimal", DECIMAL_8_4)); parquet_fields.push_back(PrimitiveNode::Make("int64-decimal", Repetition::OPTIONAL, ParquetType::INT64, ConvertedType::DECIMAL, -1, 8, 4)); - arrow_fields.push_back(std::make_shared("int64-decimal", DECIMAL_8_4)); + arrow_fields.push_back(::arrow::field("int64-decimal", DECIMAL_8_4)); - auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); + auto arrow_schema = ::arrow::schema(arrow_fields); ASSERT_OK(ConvertSchema(parquet_fields)); ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); @@ -384,9 +359,9 @@ TEST_F(TestConvertParquetSchema, ParquetLists) { auto list = GroupNode::Make("list", Repetition::REPEATED, {element}); parquet_fields.push_back( GroupNode::Make("my_list", Repetition::REQUIRED, {list}, ConvertedType::LIST)); - auto arrow_element = std::make_shared("string", UTF8, true); - auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element); - arrow_fields.push_back(std::make_shared("my_list", arrow_list, false)); + auto arrow_element = ::arrow::field("string", UTF8, true); + auto arrow_list = ::arrow::list(arrow_element); + arrow_fields.push_back(::arrow::field("my_list", arrow_list, false)); } // // List (list nullable, elements non-null) @@ -401,9 +376,9 @@ TEST_F(TestConvertParquetSchema, ParquetLists) { auto list = GroupNode::Make("list", Repetition::REPEATED, {element}); parquet_fields.push_back( GroupNode::Make("my_list", Repetition::OPTIONAL, {list}, ConvertedType::LIST)); - auto arrow_element = std::make_shared("string", UTF8, false); - auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element); - arrow_fields.push_back(std::make_shared("my_list", arrow_list, true)); + auto arrow_element = ::arrow::field("string", UTF8, false); + auto arrow_list = ::arrow::list(arrow_element); + arrow_fields.push_back(::arrow::field("my_list", arrow_list, true)); } // Element types can be nested structures. For example, a list of lists: @@ -427,11 +402,11 @@ TEST_F(TestConvertParquetSchema, ParquetLists) { auto list = GroupNode::Make("list", Repetition::REPEATED, {element}); parquet_fields.push_back(GroupNode::Make("array_of_arrays", Repetition::OPTIONAL, {list}, ConvertedType::LIST)); - auto arrow_inner_element = std::make_shared("int32", INT32, false); - auto arrow_inner_list = std::make_shared<::arrow::ListType>(arrow_inner_element); - auto arrow_element = std::make_shared("element", arrow_inner_list, false); - auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element); - arrow_fields.push_back(std::make_shared("array_of_arrays", arrow_list, true)); + auto arrow_inner_element = ::arrow::field("int32", INT32, false); + auto arrow_inner_list = ::arrow::list(arrow_inner_element); + auto arrow_element = ::arrow::field("element", arrow_inner_list, false); + auto arrow_list = ::arrow::list(arrow_element); + arrow_fields.push_back(::arrow::field("array_of_arrays", arrow_list, true)); } // // List (list nullable, elements non-null) @@ -446,9 +421,9 @@ TEST_F(TestConvertParquetSchema, ParquetLists) { auto list = GroupNode::Make("element", Repetition::REPEATED, {element}); parquet_fields.push_back( GroupNode::Make("my_list", Repetition::OPTIONAL, {list}, ConvertedType::LIST)); - auto arrow_element = std::make_shared("str", UTF8, false); - auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element); - arrow_fields.push_back(std::make_shared("my_list", arrow_list, true)); + auto arrow_element = ::arrow::field("str", UTF8, false); + auto arrow_list = ::arrow::list(arrow_element); + arrow_fields.push_back(::arrow::field("my_list", arrow_list, true)); } // // List (nullable list, non-null elements) @@ -460,9 +435,9 @@ TEST_F(TestConvertParquetSchema, ParquetLists) { PrimitiveNode::Make("element", Repetition::REPEATED, ParquetType::INT32); parquet_fields.push_back( GroupNode::Make("my_list", Repetition::OPTIONAL, {element}, ConvertedType::LIST)); - auto arrow_element = std::make_shared("element", INT32, false); - auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element); - arrow_fields.push_back(std::make_shared("my_list", arrow_list, true)); + auto arrow_element = ::arrow::field("element", INT32, false); + auto arrow_list = ::arrow::list(arrow_element); + arrow_fields.push_back(::arrow::field("my_list", arrow_list, true)); } // // List> (nullable list, non-null elements) @@ -481,13 +456,13 @@ TEST_F(TestConvertParquetSchema, ParquetLists) { GroupNode::Make("element", Repetition::REPEATED, {str_element, num_element}); parquet_fields.push_back( GroupNode::Make("my_list", Repetition::OPTIONAL, {element}, ConvertedType::LIST)); - auto arrow_str = std::make_shared("str", UTF8, false); - auto arrow_num = std::make_shared("num", INT32, false); + auto arrow_str = ::arrow::field("str", UTF8, false); + auto arrow_num = ::arrow::field("num", INT32, false); std::vector> fields({arrow_str, arrow_num}); - auto arrow_struct = std::make_shared<::arrow::StructType>(fields); - auto arrow_element = std::make_shared("element", arrow_struct, false); - auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element); - arrow_fields.push_back(std::make_shared("my_list", arrow_list, true)); + auto arrow_struct = ::arrow::struct_(fields); + auto arrow_element = ::arrow::field("element", arrow_struct, false); + auto arrow_list = ::arrow::list(arrow_element); + arrow_fields.push_back(::arrow::field("my_list", arrow_list, true)); } // // List> (nullable list, non-null elements) @@ -503,12 +478,12 @@ TEST_F(TestConvertParquetSchema, ParquetLists) { auto array = GroupNode::Make("array", Repetition::REPEATED, {element}); parquet_fields.push_back( GroupNode::Make("my_list", Repetition::OPTIONAL, {array}, ConvertedType::LIST)); - auto arrow_str = std::make_shared("str", UTF8, false); + auto arrow_str = ::arrow::field("str", UTF8, false); std::vector> fields({arrow_str}); - auto arrow_struct = std::make_shared<::arrow::StructType>(fields); - auto arrow_element = std::make_shared("array", arrow_struct, false); - auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element); - arrow_fields.push_back(std::make_shared("my_list", arrow_list, true)); + auto arrow_struct = ::arrow::struct_(fields); + auto arrow_element = ::arrow::field("array", arrow_struct, false); + auto arrow_list = ::arrow::list(arrow_element); + arrow_fields.push_back(::arrow::field("my_list", arrow_list, true)); } // // List> (nullable list, non-null elements) @@ -524,12 +499,12 @@ TEST_F(TestConvertParquetSchema, ParquetLists) { auto array = GroupNode::Make("my_list_tuple", Repetition::REPEATED, {element}); parquet_fields.push_back( GroupNode::Make("my_list", Repetition::OPTIONAL, {array}, ConvertedType::LIST)); - auto arrow_str = std::make_shared("str", UTF8, false); + auto arrow_str = ::arrow::field("str", UTF8, false); std::vector> fields({arrow_str}); - auto arrow_struct = std::make_shared<::arrow::StructType>(fields); - auto arrow_element = std::make_shared("my_list_tuple", arrow_struct, false); - auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element); - arrow_fields.push_back(std::make_shared("my_list", arrow_list, true)); + auto arrow_struct = ::arrow::struct_(fields); + auto arrow_element = ::arrow::field("my_list_tuple", arrow_struct, false); + auto arrow_list = ::arrow::list(arrow_element); + arrow_fields.push_back(::arrow::field("my_list", arrow_list, true)); } // One-level encoding: Only allows required lists with required cells @@ -537,12 +512,12 @@ TEST_F(TestConvertParquetSchema, ParquetLists) { { parquet_fields.push_back( PrimitiveNode::Make("name", Repetition::REPEATED, ParquetType::INT32)); - auto arrow_element = std::make_shared("name", INT32, false); - auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element); - arrow_fields.push_back(std::make_shared("name", arrow_list, false)); + auto arrow_element = ::arrow::field("name", INT32, false); + auto arrow_list = ::arrow::list(arrow_element); + arrow_fields.push_back(::arrow::field("name", arrow_list, false)); } - auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); + auto arrow_schema = ::arrow::schema(arrow_fields); ASSERT_OK(ConvertSchema(parquet_fields)); ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); @@ -573,20 +548,20 @@ TEST_F(TestConvertParquetSchema, ParquetNestedSchema) { parquet_fields.push_back( PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT64)); - auto group1_fields = {std::make_shared("leaf1", BOOL, false), - std::make_shared("leaf2", INT32, false)}; - auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields); - arrow_fields.push_back(std::make_shared("group1", arrow_group1_type, false)); - arrow_fields.push_back(std::make_shared("leaf3", INT64, false)); + auto group1_fields = {::arrow::field("leaf1", BOOL, false), + ::arrow::field("leaf2", INT32, false)}; + auto arrow_group1_type = ::arrow::struct_(group1_fields); + arrow_fields.push_back(::arrow::field("group1", arrow_group1_type, false)); + arrow_fields.push_back(::arrow::field("leaf3", INT64, false)); } - auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); + auto arrow_schema = ::arrow::schema(arrow_fields); ASSERT_OK(ConvertSchema(parquet_fields)); ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); } -TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartial) { +TEST_F(TestConvertParquetSchema, ParquetNestedSchema2) { std::vector parquet_fields; std::vector> arrow_fields; @@ -600,15 +575,6 @@ TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartial) { // required int64 leaf4; // } // required int64 leaf5; - // - // Expected partial arrow schema (columns 0, 3, 4): - // required group group1 { - // required int64 leaf1; - // } - // required group group2 { - // required int64 leaf4; - // } - // required int64 leaf5; { parquet_fields.push_back(GroupNode::Make( "group1", Repetition::REQUIRED, @@ -621,70 +587,19 @@ TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartial) { parquet_fields.push_back( PrimitiveNode::Make("leaf5", Repetition::REQUIRED, ParquetType::INT64)); - auto group1_fields = {std::make_shared("leaf1", INT64, false)}; - auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields); - auto group2_fields = {std::make_shared("leaf4", INT64, false)}; - auto arrow_group2_type = std::make_shared<::arrow::StructType>(group2_fields); - - arrow_fields.push_back(std::make_shared("group1", arrow_group1_type, false)); - arrow_fields.push_back(std::make_shared("group2", arrow_group2_type, false)); - arrow_fields.push_back(std::make_shared("leaf5", INT64, false)); + auto group1_fields = {::arrow::field("leaf1", INT64, false), + ::arrow::field("leaf2", INT64, false)}; + auto arrow_group1_type = ::arrow::struct_(group1_fields); + auto group2_fields = {::arrow::field("leaf3", INT64, false), + ::arrow::field("leaf4", INT64, false)}; + auto arrow_group2_type = ::arrow::struct_(group2_fields); + arrow_fields.push_back(::arrow::field("group1", arrow_group1_type, false)); + arrow_fields.push_back(::arrow::field("group2", arrow_group2_type, false)); + arrow_fields.push_back(::arrow::field("leaf5", INT64, false)); } - auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); - ASSERT_OK(ConvertSchema(parquet_fields, std::vector{0, 3, 4})); - - ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); -} - -TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartialOrdering) { - std::vector parquet_fields; - std::vector> arrow_fields; - - // Full Parquet Schema: - // required group group1 { - // required int64 leaf1; - // required int64 leaf2; - // } - // required group group2 { - // required int64 leaf3; - // required int64 leaf4; - // } - // required int64 leaf5; - // - // Expected partial arrow schema (columns 3, 4, 0): - // required group group2 { - // required int64 leaf4; - // } - // required int64 leaf5; - // required group group1 { - // required int64 leaf1; - // } - { - parquet_fields.push_back(GroupNode::Make( - "group1", Repetition::REQUIRED, - {PrimitiveNode::Make("leaf1", Repetition::REQUIRED, ParquetType::INT64), - PrimitiveNode::Make("leaf2", Repetition::REQUIRED, ParquetType::INT64)})); - parquet_fields.push_back(GroupNode::Make( - "group2", Repetition::REQUIRED, - {PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT64), - PrimitiveNode::Make("leaf4", Repetition::REQUIRED, ParquetType::INT64)})); - parquet_fields.push_back( - PrimitiveNode::Make("leaf5", Repetition::REQUIRED, ParquetType::INT64)); - - auto group1_fields = {std::make_shared("leaf1", INT64, false)}; - auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields); - auto group2_fields = {std::make_shared("leaf4", INT64, false)}; - auto arrow_group2_type = std::make_shared<::arrow::StructType>(group2_fields); - - arrow_fields.push_back(std::make_shared("group2", arrow_group2_type, false)); - arrow_fields.push_back(std::make_shared("leaf5", INT64, false)); - arrow_fields.push_back(std::make_shared("group1", arrow_group1_type, false)); - } - - auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); - ASSERT_OK(ConvertSchema(parquet_fields, std::vector{3, 4, 0})); - + auto arrow_schema = ::arrow::schema(arrow_fields); + ASSERT_OK(ConvertSchema(parquet_fields)); ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); } @@ -708,23 +623,21 @@ TEST_F(TestConvertParquetSchema, ParquetRepeatedNestedSchema) { "innerGroup", Repetition::REPEATED, {PrimitiveNode::Make("leaf3", Repetition::OPTIONAL, ParquetType::INT32)})})); - auto inner_group_fields = {std::make_shared("leaf3", INT32, true)}; - auto inner_group_type = std::make_shared<::arrow::StructType>(inner_group_fields); + auto inner_group_fields = {::arrow::field("leaf3", INT32, true)}; + auto inner_group_type = ::arrow::struct_(inner_group_fields); auto outer_group_fields = { - std::make_shared("leaf2", INT32, true), - std::make_shared( + ::arrow::field("leaf2", INT32, true), + ::arrow::field( "innerGroup", - ::arrow::list(std::make_shared("innerGroup", inner_group_type, false)), - false)}; - auto outer_group_type = std::make_shared<::arrow::StructType>(outer_group_fields); + ::arrow::list(::arrow::field("innerGroup", inner_group_type, false)), false)}; + auto outer_group_type = ::arrow::struct_(outer_group_fields); - arrow_fields.push_back(std::make_shared("leaf1", INT32, true)); - arrow_fields.push_back(std::make_shared( + arrow_fields.push_back(::arrow::field("leaf1", INT32, true)); + arrow_fields.push_back(::arrow::field( "outerGroup", - ::arrow::list(std::make_shared("outerGroup", outer_group_type, false)), - false)); + ::arrow::list(::arrow::field("outerGroup", outer_group_type, false)), false)); } - auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); + auto arrow_schema = ::arrow::schema(arrow_fields); ASSERT_OK(ConvertSchema(parquet_fields)); ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); @@ -750,7 +663,7 @@ class TestConvertArrowSchema : public ::testing::Test { } ::arrow::Status ConvertSchema(const std::vector>& fields) { - arrow_schema_ = std::make_shared<::arrow::Schema>(fields); + arrow_schema_ = ::arrow::schema(fields); std::shared_ptr<::parquet::WriterProperties> properties = ::parquet::default_writer_properties(); return ToParquetSchema(arrow_schema_.get(), *properties.get(), &result_schema_); @@ -767,51 +680,51 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) { parquet_fields.push_back( PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN)); - arrow_fields.push_back(std::make_shared("boolean", BOOL, false)); + arrow_fields.push_back(::arrow::field("boolean", BOOL, false)); parquet_fields.push_back( PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32)); - arrow_fields.push_back(std::make_shared("int32", INT32, false)); + arrow_fields.push_back(::arrow::field("int32", INT32, false)); parquet_fields.push_back( PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64)); - arrow_fields.push_back(std::make_shared("int64", INT64, false)); + arrow_fields.push_back(::arrow::field("int64", INT64, false)); parquet_fields.push_back(PrimitiveNode::Make("date", Repetition::REQUIRED, ParquetType::INT32, ConvertedType::DATE)); - arrow_fields.push_back(std::make_shared("date", ::arrow::date32(), false)); + arrow_fields.push_back(::arrow::field("date", ::arrow::date32(), false)); parquet_fields.push_back(PrimitiveNode::Make("date64", Repetition::REQUIRED, ParquetType::INT32, ConvertedType::DATE)); - arrow_fields.push_back(std::make_shared("date64", ::arrow::date64(), false)); + arrow_fields.push_back(::arrow::field("date64", ::arrow::date64(), false)); parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, ParquetType::INT64, ConvertedType::TIMESTAMP_MILLIS)); - arrow_fields.push_back(std::make_shared( - "timestamp", ::arrow::timestamp(TimeUnit::MILLI, "UTC"), false)); + arrow_fields.push_back( + ::arrow::field("timestamp", ::arrow::timestamp(TimeUnit::MILLI, "UTC"), false)); parquet_fields.push_back(PrimitiveNode::Make("timestamp[us]", Repetition::REQUIRED, ParquetType::INT64, ConvertedType::TIMESTAMP_MICROS)); - arrow_fields.push_back(std::make_shared( - "timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO, "UTC"), false)); + arrow_fields.push_back( + ::arrow::field("timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO, "UTC"), false)); parquet_fields.push_back( PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT)); - arrow_fields.push_back(std::make_shared("float", FLOAT)); + arrow_fields.push_back(::arrow::field("float", FLOAT)); parquet_fields.push_back( PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE)); - arrow_fields.push_back(std::make_shared("double", DOUBLE)); + arrow_fields.push_back(::arrow::field("double", DOUBLE)); parquet_fields.push_back(PrimitiveNode::Make( "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::UTF8)); - arrow_fields.push_back(std::make_shared("string", UTF8)); + arrow_fields.push_back(::arrow::field("string", UTF8)); parquet_fields.push_back(PrimitiveNode::Make( "binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::NONE)); - arrow_fields.push_back(std::make_shared("binary", BINARY)); + arrow_fields.push_back(::arrow::field("binary", BINARY)); ASSERT_OK(ConvertSchema(arrow_fields)); @@ -920,7 +833,7 @@ TEST_F(TestConvertArrowSchema, ArrowNonconvertibleFields) { }; for (const FieldConstructionArguments& c : cases) { - auto field = std::make_shared(c.name, c.datatype); + auto field = ::arrow::field(c.name, c.datatype); ASSERT_RAISES(NotImplemented, ConvertSchema({field})); } } @@ -937,38 +850,38 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitivesAsDictionaries) { parquet_fields.push_back( PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64)); - arrow_fields.push_back(std::make_shared( + arrow_fields.push_back(::arrow::field( "int64", ::arrow::dictionary(::arrow::int8(), ::arrow::int64()), false)); parquet_fields.push_back(PrimitiveNode::Make("date", Repetition::REQUIRED, ParquetType::INT32, ConvertedType::DATE)); - arrow_fields.push_back(std::make_shared( + arrow_fields.push_back(::arrow::field( "date", ::arrow::dictionary(::arrow::int8(), ::arrow::date32()), false)); parquet_fields.push_back(PrimitiveNode::Make("date64", Repetition::REQUIRED, ParquetType::INT32, ConvertedType::DATE)); - arrow_fields.push_back(std::make_shared( + arrow_fields.push_back(::arrow::field( "date64", ::arrow::dictionary(::arrow::int8(), ::arrow::date64()), false)); parquet_fields.push_back( PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT)); - arrow_fields.push_back(std::make_shared( - "float", ::arrow::dictionary(::arrow::int8(), ::arrow::float32()))); + arrow_fields.push_back( + ::arrow::field("float", ::arrow::dictionary(::arrow::int8(), ::arrow::float32()))); parquet_fields.push_back( PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE)); - arrow_fields.push_back(std::make_shared( - "double", ::arrow::dictionary(::arrow::int8(), ::arrow::float64()))); + arrow_fields.push_back( + ::arrow::field("double", ::arrow::dictionary(::arrow::int8(), ::arrow::float64()))); parquet_fields.push_back(PrimitiveNode::Make( "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::UTF8)); - arrow_fields.push_back(std::make_shared( - "string", ::arrow::dictionary(::arrow::int8(), ::arrow::utf8()))); + arrow_fields.push_back( + ::arrow::field("string", ::arrow::dictionary(::arrow::int8(), ::arrow::utf8()))); parquet_fields.push_back(PrimitiveNode::Make( "binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::NONE)); - arrow_fields.push_back(std::make_shared( - "binary", ::arrow::dictionary(::arrow::int8(), ::arrow::binary()))); + arrow_fields.push_back( + ::arrow::field("binary", ::arrow::dictionary(::arrow::int8(), ::arrow::binary()))); ASSERT_OK(ConvertSchema(arrow_fields)); @@ -993,9 +906,9 @@ TEST_F(TestConvertArrowSchema, ParquetLists) { auto list = GroupNode::Make("list", Repetition::REPEATED, {element}); parquet_fields.push_back( GroupNode::Make("my_list", Repetition::REQUIRED, {list}, ConvertedType::LIST)); - auto arrow_element = std::make_shared("string", UTF8, true); - auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element); - arrow_fields.push_back(std::make_shared("my_list", arrow_list, false)); + auto arrow_element = ::arrow::field("string", UTF8, true); + auto arrow_list = ::arrow::list(arrow_element); + arrow_fields.push_back(::arrow::field("my_list", arrow_list, false)); } // // List (list nullable, elements non-null) @@ -1010,9 +923,9 @@ TEST_F(TestConvertArrowSchema, ParquetLists) { auto list = GroupNode::Make("list", Repetition::REPEATED, {element}); parquet_fields.push_back( GroupNode::Make("my_list", Repetition::OPTIONAL, {list}, ConvertedType::LIST)); - auto arrow_element = std::make_shared("string", UTF8, false); - auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element); - arrow_fields.push_back(std::make_shared("my_list", arrow_list, true)); + auto arrow_element = ::arrow::field("string", UTF8, false); + auto arrow_list = ::arrow::list(arrow_element); + arrow_fields.push_back(::arrow::field("my_list", arrow_list, true)); } ASSERT_OK(ConvertSchema(arrow_fields)); diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index e710e3b2c18..11be5713c92 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -19,7 +19,6 @@ #include #include -#include #include #include #include @@ -27,7 +26,6 @@ #include #include "arrow/array.h" -#include "arrow/builder.h" #include "arrow/record_batch.h" #include "arrow/table.h" #include "arrow/type.h" @@ -58,7 +56,9 @@ using arrow::StructArray; using arrow::Table; using arrow::TimestampArray; +using parquet::schema::GroupNode; using parquet::schema::Node; +using parquet::schema::PrimitiveNode; // Help reduce verbosity using ParquetReader = parquet::ParquetFileReader; @@ -77,114 +77,37 @@ namespace parquet { namespace arrow { class ColumnChunkReaderImpl; -class ColumnReaderImpl; -ArrowReaderProperties default_arrow_reader_properties() { - static ArrowReaderProperties default_reader_props; - return default_reader_props; -} - -// ---------------------------------------------------------------------- -// Iteration utilities - -// Abstraction to decouple row group iteration details from the ColumnReader, -// so we can read only a single row group if we want -class FileColumnIterator { +class ColumnReaderImpl : public ColumnReader { public: - explicit FileColumnIterator(int column_index, ParquetFileReader* reader, - std::vector row_groups) - : column_index_(column_index), - reader_(reader), - schema_(reader->metadata()->schema()), - row_groups_(row_groups.begin(), row_groups.end()) {} - - virtual ~FileColumnIterator() {} - - std::unique_ptr<::parquet::PageReader> NextChunk() { - if (row_groups_.empty()) { - return nullptr; - } - - auto row_group_reader = reader_->RowGroup(row_groups_.front()); - row_groups_.pop_front(); - return row_group_reader->GetColumnPageReader(column_index_); - } - - const SchemaDescriptor* schema() const { return schema_; } + enum ReaderType { PRIMITIVE, LIST, STRUCT }; - const ColumnDescriptor* descr() const { return schema_->Column(column_index_); } - - std::shared_ptr metadata() const { return reader_->metadata(); } + virtual Status GetDefLevels(const int16_t** data, int64_t* length) = 0; + virtual Status GetRepLevels(const int16_t** data, int64_t* length) = 0; + virtual const std::shared_ptr field() = 0; - int column_index() const { return column_index_; } + virtual const ColumnDescriptor* descr() const = 0; - protected: - int column_index_; - ParquetFileReader* reader_; - const SchemaDescriptor* schema_; - std::deque row_groups_; + virtual ReaderType type() const = 0; }; -using FileColumnIteratorFactory = - std::function; - -class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader { - public: - explicit RowGroupRecordBatchReader(const std::vector& row_group_indices, - const std::vector& column_indices, - std::shared_ptr<::arrow::Schema> schema, - FileReader* reader, int64_t batch_size) - : column_readers_(), - row_group_indices_(row_group_indices), - column_indices_(column_indices), - schema_(schema), - file_reader_(reader), - batch_size_(batch_size) {} - - ~RowGroupRecordBatchReader() override {} - - std::shared_ptr<::arrow::Schema> schema() const override { return schema_; } - - Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* out) override { - if (column_readers_.empty()) { - // Initialize the column readers - column_readers_.reserve(column_indices_.size()); - - for (size_t i = 0; i < column_indices_.size(); ++i) { - ColumnReaderPtr tmp; - RETURN_NOT_OK(file_reader_->GetColumn(column_indices_[i], &tmp)); - column_readers_.emplace_back(std::move(tmp)); - } - } - - // TODO (hatemhelal): Consider refactoring this to share logic with ReadTable as this - // does not currently honor the use_threads option. - std::vector> columns(column_indices_.size()); +ArrowReaderProperties default_arrow_reader_properties() { + static ArrowReaderProperties default_reader_props; + return default_reader_props; +} - for (size_t i = 0; i < column_indices_.size(); ++i) { - RETURN_NOT_OK(column_readers_[i]->NextBatch(batch_size_, &columns[i])); - } +// ---------------------------------------------------------------------- +// FileReaderImpl forward declaration - // Create an intermediate table and use TableBatchReader as an adaptor to a - // RecordBatch - std::shared_ptr table = Table::Make(schema_, columns); - RETURN_NOT_OK(table->Validate()); - ::arrow::TableBatchReader table_batch_reader(*table); - return table_batch_reader.ReadNext(out); - } +namespace { - private: - using ColumnReaderPtr = std::unique_ptr; - std::vector column_readers_; - std::vector row_group_indices_; - std::vector column_indices_; - std::shared_ptr<::arrow::Schema> schema_; - FileReader* file_reader_; - int64_t batch_size_; -}; +std::vector Arange(int length) { + std::vector result(length); + std::iota(result.begin(), result.end(), 0); + return result; +} -// ---------------------------------------------------------------------- -// FileReaderImpl forward declaration +} // namespace class FileReaderImpl : public FileReader { public: @@ -193,8 +116,16 @@ class FileReaderImpl : public FileReader { : pool_(pool), reader_(std::move(reader)), reader_properties_(properties) {} Status Init() { - // TODO(wesm): Smarter schema/column-reader initialization for nested data - return Status::OK(); + return BuildSchemaManifest(reader_->metadata()->schema(), reader_properties_, + &manifest_); + } + + std::vector AllRowGroups() { + return Arange(reader_->metadata()->num_row_groups()); + } + + std::vector AllColumnIndices() { + return Arange(reader_->metadata()->num_columns()); } FileColumnIteratorFactory SomeRowGroupsFactory(std::vector row_groups) { @@ -203,28 +134,36 @@ class FileReaderImpl : public FileReader { }; } - std::vector AllRowGroups() { - std::vector row_groups(reader_->metadata()->num_row_groups()); - std::iota(row_groups.begin(), row_groups.end(), 0); - return row_groups; + FileColumnIteratorFactory AllRowGroupsFactory() { + return SomeRowGroupsFactory(AllRowGroups()); } - std::vector AllColumnIndices() { - std::vector indices(reader_->metadata()->num_columns()); - std::iota(indices.begin(), indices.end(), 0); - return indices; + Status BoundsCheckColumn(int column) { + if (column < 0 || column >= this->num_columns()) { + return Status::Invalid("Column index out of bounds (got ", column, + ", should be " + "between 0 and ", + this->num_columns() - 1, ")"); + } + return Status::OK(); } - FileColumnIteratorFactory AllRowGroupsFactory() { - return SomeRowGroupsFactory(AllRowGroups()); + Status BoundsCheckRowGroup(int row_group) { + // row group indices check + if (row_group < 0 || row_group >= num_row_groups()) { + return Status::Invalid("Some index in row_group_indices is ", row_group, + ", which is either < 0 or >= num_row_groups(", + num_row_groups(), ")"); + } + return Status::OK(); } int64_t GetTotalRecords(const std::vector& row_groups, int column_chunk = 0) { // Can throw exception int64_t records = 0; - for (int j = 0; j < static_cast(row_groups.size()); j++) { + for (auto row_group : row_groups) { records += reader_->metadata() - ->RowGroup(row_groups[j]) + ->RowGroup(row_group) ->ColumnChunk(column_chunk) ->num_values(); } @@ -233,18 +172,43 @@ class FileReaderImpl : public FileReader { std::shared_ptr RowGroup(int row_group_index) override; - Status GetReaderForNode(int index, const Node* node, const std::vector& indices, - int16_t def_level, FileColumnIteratorFactory iterator_factory, - std::unique_ptr* out); - Status ReadTable(const std::vector& indices, std::shared_ptr
* out) override { return ReadRowGroups(AllRowGroups(), indices, out); } + Status GetFieldReader(int i, const std::vector& indices, + const std::vector& row_groups, + std::unique_ptr* out) { + ReaderContext ctx; + ctx.reader = reader_.get(); + ctx.pool = pool_; + ctx.iterator_factory = SomeRowGroupsFactory(row_groups); + ctx.filter_leaves = true; + ctx.included_leaves.insert(indices.begin(), indices.end()); + return manifest_.schema_fields[i].GetReader(ctx, out); + } + Status GetColumn(int i, FileColumnIteratorFactory iterator_factory, std::unique_ptr* out); + Status ReadSchemaField(int i, const std::vector& indices, + const std::vector& row_groups, + std::shared_ptr* out_field, + std::shared_ptr* out) { + BEGIN_PARQUET_CATCH_EXCEPTIONS + std::unique_ptr reader; + RETURN_NOT_OK(GetFieldReader(i, indices, row_groups, &reader)); + + *out_field = reader->field(); + + // TODO(wesm): This calculation doesn't make much sense when we have repeated + // schema nodes + int64_t records_to_read = GetTotalRecords(row_groups, i); + return reader->NextBatch(records_to_read, out); + END_PARQUET_CATCH_EXCEPTIONS + } + Status GetColumn(int i, std::unique_ptr* out) override { return GetColumn(i, AllRowGroupsFactory(), out); } @@ -254,15 +218,12 @@ class FileReaderImpl : public FileReader { reader_->metadata()->key_value_metadata(), out); } - Status GetSchema(const std::vector& indices, - std::shared_ptr<::arrow::Schema>* out) override { - return FromParquetSchema(reader_->metadata()->schema(), indices, reader_properties_, - reader_->metadata()->key_value_metadata(), out); - } - Status ReadSchemaField(int i, const std::vector& indices, const std::vector& row_groups, - std::shared_ptr* out); + std::shared_ptr* out) { + std::shared_ptr unused; + return ReadSchemaField(i, indices, row_groups, &unused, out); + } Status ReadSchemaField(int i, const std::vector& indices, std::shared_ptr* out) { @@ -288,11 +249,7 @@ class FileReaderImpl : public FileReader { } Status ReadTable(std::shared_ptr
* table) override { - std::vector indices(reader_->metadata()->num_columns()); - for (size_t i = 0; i < indices.size(); ++i) { - indices[i] = static_cast(i); - } - return ReadTable(indices, table); + return ReadTable(AllColumnIndices(), table); } Status ReadRowGroups(const std::vector& row_groups, @@ -313,54 +270,13 @@ class FileReaderImpl : public FileReader { return ReadRowGroup(i, AllColumnIndices(), table); } - std::vector GetDictionaryIndices(const std::vector& indices) { - // Select the column indices that were read as DictionaryArray - std::vector dict_indices(indices); - auto remove_func = [this](int i) { return !reader_properties_.read_dictionary(i); }; - auto it = std::remove_if(dict_indices.begin(), dict_indices.end(), remove_func); - dict_indices.erase(it, dict_indices.end()); - return dict_indices; - } - - std::shared_ptr<::arrow::Schema> FixSchema( - const ::arrow::Schema& old_schema, const std::vector& dict_indices, - const std::vector>& columns) { - // Fix the schema with the actual DictionaryType that was read - auto fields = old_schema.fields(); - - for (int idx : dict_indices) { - fields[idx] = old_schema.field(idx)->WithType(columns[idx]->type()); - } - return std::make_shared<::arrow::Schema>(fields, old_schema.metadata()); - } - Status GetRecordBatchReader(const std::vector& row_group_indices, - std::shared_ptr* out) override { - return GetRecordBatchReader(row_group_indices, AllColumnIndices(), out); - } - - Status BoundsCheckRowGroup(int row_group) { - // row group indices check - if (row_group < 0 || row_group >= num_row_groups()) { - return Status::Invalid("Some index in row_group_indices is ", row_group, - ", which is either < 0 or >= num_row_groups(", - num_row_groups(), ")"); - } - return Status::OK(); - } + const std::vector& column_indices, + std::shared_ptr* out) override; Status GetRecordBatchReader(const std::vector& row_group_indices, - const std::vector& column_indices, std::shared_ptr* out) override { - // column indices check - std::shared_ptr<::arrow::Schema> schema; - RETURN_NOT_OK(GetSchema(column_indices, &schema)); - for (auto row_group_index : row_group_indices) { - RETURN_NOT_OK(BoundsCheckRowGroup(row_group_index)); - } - *out = std::make_shared(row_group_indices, column_indices, - schema, this, batch_size()); - return Status::OK(); + return GetRecordBatchReader(row_group_indices, AllColumnIndices(), out); } int num_columns() const { return reader_->metadata()->num_columns(); } @@ -373,8 +289,6 @@ class FileReaderImpl : public FileReader { reader_properties_.set_use_threads(use_threads); } - int64_t batch_size() const { return reader_properties_.batch_size(); } - Status ScanContents(std::vector columns, const int32_t column_batch_size, int64_t* num_rows) override { BEGIN_PARQUET_CATCH_EXCEPTIONS @@ -383,10 +297,68 @@ class FileReaderImpl : public FileReader { END_PARQUET_CATCH_EXCEPTIONS } - private: MemoryPool* pool_; std::unique_ptr reader_; ArrowReaderProperties reader_properties_; + + SchemaManifest manifest_; +}; + +class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader { + public: + RowGroupRecordBatchReader(std::vector> field_readers, + std::shared_ptr<::arrow::Schema> schema, int64_t batch_size) + : field_readers_(std::move(field_readers)), + schema_(schema), + batch_size_(batch_size) {} + + ~RowGroupRecordBatchReader() override {} + + std::shared_ptr<::arrow::Schema> schema() const override { return schema_; } + + static Status Make(const std::vector& row_groups, + const std::vector& column_indices, FileReaderImpl* reader, + int64_t batch_size, + std::shared_ptr<::arrow::RecordBatchReader>* out) { + std::vector field_indices; + if (!reader->manifest_.GetFieldIndices(column_indices, &field_indices)) { + return Status::Invalid("Invalid column index"); + } + std::vector> field_readers(field_indices.size()); + std::vector> fields; + for (size_t i = 0; i < field_indices.size(); ++i) { + RETURN_NOT_OK(reader->GetFieldReader(field_indices[i], column_indices, row_groups, + &field_readers[i])); + fields.push_back(field_readers[i]->field()); + } + *out = std::make_shared( + std::move(field_readers), ::arrow::schema(fields), batch_size); + return Status::OK(); + } + + Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* out) override { + // TODO (hatemhelal): Consider refactoring this to share logic with ReadTable as this + // does not currently honor the use_threads option. + std::vector> columns(field_readers_.size()); + for (size_t i = 0; i < field_readers_.size(); ++i) { + RETURN_NOT_OK(field_readers_[i]->NextBatch(batch_size_, &columns[i])); + if (columns[i]->num_chunks() > 1) { + return Status::NotImplemented("This class cannot yet iterate chunked arrays"); + } + } + + // Create an intermediate table and use TableBatchReader as an adaptor to a + // RecordBatch + std::shared_ptr
table = Table::Make(schema_, columns); + RETURN_NOT_OK(table->Validate()); + ::arrow::TableBatchReader table_batch_reader(*table); + return table_batch_reader.ReadNext(out); + } + + private: + std::vector> field_readers_; + std::shared_ptr<::arrow::Schema> schema_; + int64_t batch_size_; }; class ColumnChunkReaderImpl : public ColumnChunkReader { @@ -428,34 +400,26 @@ class RowGroupReaderImpl : public RowGroupReader { int row_group_index_; }; -class ColumnReaderImpl : public ColumnReader { +// Leaf reader is for primitive arrays and primitive children of nested arrays +class LeafReader : public ColumnReaderImpl { public: - virtual Status GetDefLevels(const int16_t** data, size_t* length) = 0; - virtual Status GetRepLevels(const int16_t** data, size_t* length) = 0; - virtual const std::shared_ptr field() = 0; -}; - -// Reader implementation for primitive arrays -class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReaderImpl { - public: - PrimitiveImpl(MemoryPool* pool, std::unique_ptr input, - bool read_dictionary) - : pool_(pool), input_(std::move(input)), descr_(input_->descr()) { - record_reader_ = RecordReader::Make(descr_, pool_, read_dictionary); - Status s = NodeToField(*input_->descr()->schema_node(), &field_); - DCHECK_OK(s); + LeafReader(const ReaderContext& ctx, const std::shared_ptr& field, + std::unique_ptr input) + : ctx_(ctx), field_(field), input_(std::move(input)), descr_(input_->descr()) { + record_reader_ = RecordReader::Make(descr_, ctx_.pool, + field->type()->id() == ::arrow::Type::DICTIONARY); NextRowGroup(); } - Status GetDefLevels(const int16_t** data, size_t* length) override { + Status GetDefLevels(const int16_t** data, int64_t* length) override { *data = record_reader_->def_levels(); - *length = record_reader_->levels_written(); + *length = record_reader_->levels_position(); return Status::OK(); } - Status GetRepLevels(const int16_t** data, size_t* length) override { + Status GetRepLevels(const int16_t** data, int64_t* length) override { *data = record_reader_->rep_levels(); - *length = record_reader_->levels_written(); + *length = record_reader_->levels_position(); return Status::OK(); } @@ -477,17 +441,15 @@ class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReaderImpl { } } RETURN_NOT_OK( - TransferColumnData(record_reader_.get(), field_->type(), descr_, pool_, out)); - - // Nest nested types, if not nested returns unmodifed - RETURN_NOT_OK(WrapIntoListArray(out)); + TransferColumnData(record_reader_.get(), field_->type(), descr_, ctx_.pool, out)); return Status::OK(); END_PARQUET_CATCH_EXCEPTIONS } - Status WrapIntoListArray(std::shared_ptr* inout_array); - const std::shared_ptr field() override { return field_; } + const ColumnDescriptor* descr() const override { return descr_; } + + ReaderType type() const override { return PRIMITIVE; } private: void NextRowGroup() { @@ -495,188 +457,111 @@ class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReaderImpl { record_reader_->SetPageReader(std::move(page_reader)); } - MemoryPool* pool_; + ReaderContext ctx_; + std::shared_ptr field_; std::unique_ptr input_; const ColumnDescriptor* descr_; - std::shared_ptr record_reader_; - std::shared_ptr field_; }; -Status PrimitiveImpl::WrapIntoListArray(std::shared_ptr* inout_array) { - if (descr_->max_repetition_level() == 0) { - // Flat, no action - return Status::OK(); - } - - std::shared_ptr flat_array; +class NestedListReader : public ColumnReaderImpl { + public: + NestedListReader(const ReaderContext& ctx, std::shared_ptr field, + int16_t max_definition_level, int16_t max_repetition_level, + std::unique_ptr item_reader) + : ctx_(ctx), + field_(field), + max_definition_level_(max_definition_level), + max_repetition_level_(max_repetition_level), + item_reader_(std::move(item_reader)) {} - // ARROW-3762(wesm): If inout_array is a chunked array, we reject as this is - // not yet implemented - if ((*inout_array)->num_chunks() > 1) { - return Status::NotImplemented( - "Nested data conversions not implemented for " - "chunked array outputs"); + Status GetDefLevels(const int16_t** data, int64_t* length) override { + return item_reader_->GetDefLevels(data, length); } - flat_array = (*inout_array)->chunk(0); - - const int16_t* def_levels = record_reader_->def_levels(); - const int16_t* rep_levels = record_reader_->rep_levels(); - const int64_t total_levels_read = record_reader_->levels_position(); - std::shared_ptr<::arrow::Schema> arrow_schema; - RETURN_NOT_OK(FromParquetSchema( - input_->schema(), {input_->column_index()}, default_arrow_reader_properties(), - input_->metadata()->key_value_metadata(), &arrow_schema)); - std::shared_ptr current_field = arrow_schema->field(0); - - if (current_field->type()->num_children() > 0 && - flat_array->type_id() == ::arrow::Type::DICTIONARY) { - // XXX(wesm): Handling of nested types and dictionary encoding needs to be - // significantly refactored - return Status::Invalid("Cannot have nested types containing dictionary arrays yet"); + Status GetRepLevels(const int16_t** data, int64_t* length) override { + return item_reader_->GetRepLevels(data, length); } - // Walk downwards to extract nullability - std::vector nullable; - std::vector> offset_builders; - std::vector> valid_bits_builders; - nullable.push_back(current_field->nullable()); - while (current_field->type()->num_children() > 0) { - if (current_field->type()->num_children() > 1) { - return Status::NotImplemented("Fields with more than one child are not supported."); - } else { - if (current_field->type()->id() != ::arrow::Type::LIST) { - return Status::NotImplemented("Currently only nesting with Lists is supported."); - } - current_field = current_field->type()->child(0); - } - offset_builders.emplace_back( - std::make_shared<::arrow::Int32Builder>(::arrow::int32(), pool_)); - valid_bits_builders.emplace_back( - std::make_shared<::arrow::BooleanBuilder>(::arrow::boolean(), pool_)); - nullable.push_back(current_field->nullable()); - } - - int64_t list_depth = offset_builders.size(); - // This describes the minimal definition that describes a level that - // reflects a value in the primitive values array. - int16_t values_def_level = descr_->max_definition_level(); - if (nullable[nullable.size() - 1]) { - values_def_level--; - } - - // The definition levels that are needed so that a list is declared - // as empty and not null. - std::vector empty_def_level(list_depth); - int def_level = 0; - for (int i = 0; i < list_depth; i++) { - if (nullable[i]) { - def_level++; - } - empty_def_level[i] = static_cast(def_level); - def_level++; - } - - int32_t values_offset = 0; - std::vector null_counts(list_depth, 0); - for (int64_t i = 0; i < total_levels_read; i++) { - int16_t rep_level = rep_levels[i]; - if (rep_level < descr_->max_repetition_level()) { - for (int64_t j = rep_level; j < list_depth; j++) { - if (j == (list_depth - 1)) { - RETURN_NOT_OK(offset_builders[j]->Append(values_offset)); - } else { - RETURN_NOT_OK(offset_builders[j]->Append( - static_cast(offset_builders[j + 1]->length()))); - } - - if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) { - RETURN_NOT_OK(valid_bits_builders[j]->Append(false)); - null_counts[j]++; - break; - } else { - RETURN_NOT_OK(valid_bits_builders[j]->Append(true)); - if (empty_def_level[j] == def_levels[i]) { - break; - } - } - } - } - if (def_levels[i] >= values_def_level) { - values_offset++; + Status NextBatch(int64_t records_to_read, std::shared_ptr* out) override { + if (item_reader_->type() == ColumnReaderImpl::STRUCT) { + return Status::Invalid("Mix of struct and list types not yet supported"); } - } - // Add the final offset to all lists - for (int64_t j = 0; j < list_depth; j++) { - if (j == (list_depth - 1)) { - RETURN_NOT_OK(offset_builders[j]->Append(values_offset)); - } else { - RETURN_NOT_OK(offset_builders[j]->Append( - static_cast(offset_builders[j + 1]->length()))); + + RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out)); + + // ARROW-3762(wesm): If item reader yields a chunked array, we reject as + // this is not yet implemented + if ((*out)->num_chunks() > 1) { + return Status::NotImplemented( + "Nested data conversions not implemented for chunked array outputs"); } - } - std::vector> offsets; - std::vector> valid_bits; - std::vector list_lengths; - for (int64_t j = 0; j < list_depth; j++) { - list_lengths.push_back(offset_builders[j]->length() - 1); - std::shared_ptr array; - RETURN_NOT_OK(offset_builders[j]->Finish(&array)); - offsets.emplace_back(std::static_pointer_cast(array)->values()); - RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array)); - valid_bits.emplace_back(std::static_pointer_cast(array)->values()); + const int16_t* def_levels; + const int16_t* rep_levels; + int64_t num_levels; + RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels)); + RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels)); + std::shared_ptr result; + RETURN_NOT_OK(ReconstructNestedList((*out)->chunk(0), field_, max_definition_level_, + max_repetition_level_, def_levels, rep_levels, + num_levels, ctx_.pool, &result)); + *out = std::make_shared(result); + return Status::OK(); } - std::shared_ptr output = flat_array; - for (int64_t j = list_depth - 1; j >= 0; j--) { - auto list_type = - ::arrow::list(::arrow::field("item", output->type(), nullable[j + 1])); - output = std::make_shared<::arrow::ListArray>(list_type, list_lengths[j], offsets[j], - output, valid_bits[j], null_counts[j]); - } - *inout_array = std::make_shared(output); - return Status::OK(); -} + const std::shared_ptr field() override { return field_; } + + const ColumnDescriptor* descr() const override { return nullptr; } -// Reader implementation for struct array + ReaderType type() const override { return LIST; } + + private: + ReaderContext ctx_; + std::shared_ptr field_; + int16_t max_definition_level_; + int16_t max_repetition_level_; + std::unique_ptr item_reader_; +}; -class PARQUET_NO_EXPORT StructImpl : public ColumnReaderImpl { +class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl { public: - explicit StructImpl(const std::vector>& children, - int16_t struct_def_level, MemoryPool* pool, const Node* node) - : children_(children), struct_def_level_(struct_def_level), pool_(pool) { - InitField(node, children); - } + explicit StructReader(const ReaderContext& ctx, const SchemaField& schema_field, + std::shared_ptr filtered_field, + std::vector>&& children) + : ctx_(ctx), + schema_field_(schema_field), + filtered_field_(filtered_field), + struct_def_level_(schema_field.max_definition_level), + children_(std::move(children)) {} Status NextBatch(int64_t records_to_read, std::shared_ptr* out) override; - Status GetDefLevels(const int16_t** data, size_t* length) override; - Status GetRepLevels(const int16_t** data, size_t* length) override; - const std::shared_ptr field() override { return field_; } + Status GetDefLevels(const int16_t** data, int64_t* length) override; + Status GetRepLevels(const int16_t** data, int64_t* length) override; + const std::shared_ptr field() override { return filtered_field_; } + const ColumnDescriptor* descr() const override { return nullptr; } + ReaderType type() const override { return STRUCT; } private: - std::vector> children_; + ReaderContext ctx_; + SchemaField schema_field_; + std::shared_ptr filtered_field_; int16_t struct_def_level_; - MemoryPool* pool_; - std::shared_ptr field_; + std::vector> children_; std::shared_ptr def_levels_buffer_; - Status DefLevelsToNullArray(std::shared_ptr* null_bitmap, int64_t* null_count); - void InitField(const Node* node, - const std::vector>& children); }; -Status StructImpl::DefLevelsToNullArray(std::shared_ptr* null_bitmap_out, - int64_t* null_count_out) { +Status StructReader::DefLevelsToNullArray(std::shared_ptr* null_bitmap_out, + int64_t* null_count_out) { std::shared_ptr null_bitmap; auto null_count = 0; const int16_t* def_levels_data; - size_t def_levels_length; + int64_t def_levels_length; RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length)); - RETURN_NOT_OK(AllocateEmptyBitmap(pool_, def_levels_length, &null_bitmap)); + RETURN_NOT_OK(AllocateEmptyBitmap(ctx_.pool, def_levels_length, &null_bitmap)); uint8_t* null_bitmap_ptr = null_bitmap->mutable_data(); - for (size_t i = 0; i < def_levels_length; i++) { + for (int64_t i = 0; i < def_levels_length; i++) { if (def_levels_data[i] < struct_def_level_) { // Mark null null_count += 1; @@ -693,7 +578,7 @@ Status StructImpl::DefLevelsToNullArray(std::shared_ptr* null_bitmap_out // TODO(itaiin): Consider caching the results of this calculation - // note that this is only used once for each read for now -Status StructImpl::GetDefLevels(const int16_t** data, size_t* length) { +Status StructReader::GetDefLevels(const int16_t** data, int64_t* length) { *data = nullptr; if (children_.size() == 0) { // Empty struct @@ -703,10 +588,10 @@ Status StructImpl::GetDefLevels(const int16_t** data, size_t* length) { // We have at least one child const int16_t* child_def_levels; - size_t child_length; + int64_t child_length; RETURN_NOT_OK(children_[0]->GetDefLevels(&child_def_levels, &child_length)); auto size = child_length * sizeof(int16_t); - RETURN_NOT_OK(AllocateResizableBuffer(pool_, size, &def_levels_buffer_)); + RETURN_NOT_OK(AllocateResizableBuffer(ctx_.pool, size, &def_levels_buffer_)); // Initialize with the minimal def level std::memset(def_levels_buffer_->mutable_data(), -1, size); auto result_levels = reinterpret_cast(def_levels_buffer_->mutable_data()); @@ -717,10 +602,10 @@ Status StructImpl::GetDefLevels(const int16_t** data, size_t* length) { // the nesting level, and the def level equals max(children def levels) // All other possibilities are malformed definition data. for (auto& child : children_) { - size_t current_child_length; + int64_t current_child_length; RETURN_NOT_OK(child->GetDefLevels(&child_def_levels, ¤t_child_length)); DCHECK_EQ(child_length, current_child_length); - for (size_t i = 0; i < child_length; i++) { + for (int64_t i = 0; i < child_length; i++) { // Check that value is either uninitialized, or current // and previous children def levels agree on the struct level DCHECK((result_levels[i] == -1) || ((result_levels[i] >= struct_def_level_) == @@ -730,34 +615,26 @@ Status StructImpl::GetDefLevels(const int16_t** data, size_t* length) { } } *data = reinterpret_cast(def_levels_buffer_->data()); - *length = child_length; + *length = static_cast(child_length); return Status::OK(); } -void StructImpl::InitField( - const Node* node, const std::vector>& children) { - // Make a shallow node to field conversion from the children fields - std::vector> fields(children.size()); - for (size_t i = 0; i < children.size(); i++) { - fields[i] = children[i]->field(); - } - - auto type = ::arrow::struct_(fields); - field_ = ::arrow::field(node->name(), type, node->is_optional()); -} - -Status StructImpl::GetRepLevels(const int16_t** data, size_t* length) { +Status StructReader::GetRepLevels(const int16_t** data, int64_t* length) { return Status::NotImplemented("GetRepLevels is not implemented for struct"); } -Status StructImpl::NextBatch(int64_t records_to_read, - std::shared_ptr* out) { +Status StructReader::NextBatch(int64_t records_to_read, + std::shared_ptr* out) { std::vector> children_arrays; std::shared_ptr null_bitmap; int64_t null_count; // Gather children arrays and def levels for (auto& child : children_) { + if (child->type() == ColumnReaderImpl::LIST) { + return Status::Invalid("Mix of struct and list types not yet supported"); + } + std::shared_ptr field; RETURN_NOT_OK(child->NextBatch(records_to_read, &field)); @@ -787,57 +664,85 @@ Status StructImpl::NextBatch(int64_t records_to_read, // ---------------------------------------------------------------------- // File reader implementation -Status FileReaderImpl::GetReaderForNode(int index, const Node* node, - const std::vector& indices, - int16_t def_level, - FileColumnIteratorFactory iterator_factory, - std::unique_ptr* out) { - *out = nullptr; - - if (schema::IsSimpleStruct(node)) { - const schema::GroupNode* group = static_cast(node); - std::vector> children; - for (int i = 0; i < group->field_count(); i++) { - std::unique_ptr child_reader; - // TODO(itaiin): Remove the -1 index hack when all types of nested reads - // are supported. This currently just signals the lower level reader resolution - // to abort - RETURN_NOT_OK(GetReaderForNode(index, group->field(i).get(), indices, - static_cast(def_level + 1), - iterator_factory, &child_reader)); - if (child_reader != nullptr) { - children.push_back(std::move(child_reader)); - } +Status SchemaField::GetReader(const ReaderContext& ctx, + std::unique_ptr* out) const { + auto type_id = this->field->type()->id(); + if (this->children.size() == 0) { + std::unique_ptr input( + ctx.iterator_factory(this->column_index, ctx.reader)); + out->reset(new LeafReader(ctx, this->field, std::move(input))); + } else if (type_id == ::arrow::Type::LIST) { + // We can only read lists-of-lists or structs at the moment + auto list_field = this->field; + auto child = &this->children[0]; + while (child->field->type()->id() == ::arrow::Type::LIST) { + child = &child->children[0]; } - - if (children.size() > 0) { - *out = std::unique_ptr( - new StructImpl(children, def_level, pool_, node)); + if (child->field->type()->id() == ::arrow::Type::STRUCT) { + return Status::NotImplemented("Lists of structs not yet supported"); } - } else { - // This should be a flat field case - translate the field index to - // the correct column index by walking down to the leaf node - const Node* walker = node; - while (!walker->is_primitive()) { - DCHECK(walker->is_group()); - auto group = static_cast(walker); - if (group->field_count() != 1) { - return Status::NotImplemented("lists with structs are not supported."); + if (!ctx.IncludesLeaf(child->column_index)) { + *out = nullptr; + return Status::OK(); + } + std::unique_ptr child_reader; + RETURN_NOT_OK(child->GetReader(ctx, &child_reader)); + // Use the max definition/repetition level of the leaf here + out->reset(new NestedListReader(ctx, list_field, child->max_definition_level, + child->max_repetition_level, + std::move(child_reader))); + } else if (type_id == ::arrow::Type::STRUCT) { + std::vector> child_fields; + std::vector> child_readers; + for (const auto& child : this->children) { + if (child.is_leaf() && !ctx.IncludesLeaf(child.column_index)) { + // Excluded leaf + continue; + } + std::unique_ptr child_reader; + RETURN_NOT_OK(child.GetReader(ctx, &child_reader)); + if (!child_reader) { + // If all children were pruned, then we do not try to read this field + continue; } - walker = group->field(0).get(); + child_fields.push_back(child.field); + child_readers.emplace_back(std::move(child_reader)); } - auto column_index = reader_->metadata()->schema()->ColumnIndex(*walker); - - // If the index of the column is found then a reader for the column is needed. - // Otherwise *out keeps the nullptr value. - if (std::find(indices.begin(), indices.end(), column_index) != indices.end()) { - std::unique_ptr reader; - RETURN_NOT_OK(GetColumn(column_index, iterator_factory, &reader)); - *out = std::unique_ptr( - static_cast(reader.release())); + if (child_fields.size() == 0) { + *out = nullptr; + return Status::OK(); } + auto filtered_field = ::arrow::field( + this->field->name(), ::arrow::struct_(child_fields), this->field->nullable()); + out->reset(new StructReader(ctx, *this, filtered_field, std::move(child_readers))); + } else { + return Status::Invalid("Unsupported nested type: ", this->field->ToString()); } + return Status::OK(); +} +Status FileReaderImpl::GetRecordBatchReader(const std::vector& row_group_indices, + const std::vector& column_indices, + std::shared_ptr* out) { + // column indices check + for (auto row_group_index : row_group_indices) { + RETURN_NOT_OK(BoundsCheckRowGroup(row_group_index)); + } + return RowGroupRecordBatchReader::Make(row_group_indices, column_indices, this, + reader_properties_.batch_size(), out); +} + +Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_factory, + std::unique_ptr* out) { + RETURN_NOT_OK(BoundsCheckColumn(i)); + ReaderContext ctx; + ctx.reader = reader_.get(); + ctx.pool = pool_; + ctx.iterator_factory = AllRowGroupsFactory(); + ctx.filter_leaves = false; + std::unique_ptr result; + RETURN_NOT_OK(manifest_.schema_fields[i].GetReader(ctx, &result)); + out->reset(result.release()); return Status::OK(); } @@ -846,21 +751,20 @@ Status FileReaderImpl::ReadRowGroups(const std::vector& row_groups, std::shared_ptr
* out) { BEGIN_PARQUET_CATCH_EXCEPTIONS - std::shared_ptr<::arrow::Schema> schema; - RETURN_NOT_OK(GetSchema(indices, &schema)); - // We only need to read schema fields which have columns indicated // in the indices vector std::vector field_indices; - if (!schema::ColumnIndicesToFieldIndices(*reader_->metadata()->schema(), indices, - &field_indices)) { + if (!manifest_.GetFieldIndices(indices, &field_indices)) { return Status::Invalid("Invalid column index"); } + int num_fields = static_cast(field_indices.size()); + std::vector> fields(num_fields); std::vector> columns(num_fields); auto ReadColumnFunc = [&](int i) { - return ReadSchemaField(field_indices[i], indices, row_groups, &columns[i]); + return ReadSchemaField(field_indices[i], indices, row_groups, &fields[i], + &columns[i]); }; if (reader_properties_.use_threads()) { @@ -883,49 +787,9 @@ Status FileReaderImpl::ReadRowGroups(const std::vector& row_groups, } } - auto dict_indices = GetDictionaryIndices(indices); - if (!dict_indices.empty()) { - schema = FixSchema(*schema, dict_indices, columns); - } - std::shared_ptr
table = Table::Make(schema, columns); - RETURN_NOT_OK(table->Validate()); - *out = table; - return Status::OK(); - END_PARQUET_CATCH_EXCEPTIONS -} - -Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_factory, - std::unique_ptr* out) { - if (i < 0 || i >= this->num_columns()) { - return Status::Invalid("Column index out of bounds (got ", i, - ", should be " - "between 0 and ", - this->num_columns() - 1, ")"); - } - - std::unique_ptr input(iterator_factory(i, reader_.get())); - *out = std::unique_ptr( - new PrimitiveImpl(pool_, std::move(input), reader_properties_.read_dictionary(i))); - return Status::OK(); -} - -Status FileReaderImpl::ReadSchemaField(int i, const std::vector& indices, - const std::vector& row_groups, - std::shared_ptr* out) { - BEGIN_PARQUET_CATCH_EXCEPTIONS - auto parquet_schema = reader_->metadata()->schema(); - auto node = parquet_schema->group_node()->field(i).get(); - std::unique_ptr reader_impl; - RETURN_NOT_OK(GetReaderForNode(i, node, indices, 1, SomeRowGroupsFactory(row_groups), - &reader_impl)); - if (reader_impl == nullptr) { - *out = nullptr; - return Status::OK(); - } - // TODO(wesm): This calculation doesn't make much sense when we have repeated - // schema nodes - int64_t records_to_read = GetTotalRecords(row_groups, i); - return reader_impl->NextBatch(records_to_read, out); + auto result_schema = ::arrow::schema(fields, reader_->metadata()->key_value_metadata()); + *out = Table::Make(result_schema, columns); + return (*out)->Validate(); END_PARQUET_CATCH_EXCEPTIONS } diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index 5b2f2c71fd5..e73fcc0cb16 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -28,6 +28,7 @@ namespace arrow { class ChunkedArray; +class KeyValueMetadata; class RecordBatchReader; class Schema; class Table; @@ -39,6 +40,7 @@ namespace parquet { class FileMetaData; class ParquetFileReader; class ReaderProperties; +class SchemaDescriptor; namespace arrow { @@ -171,11 +173,6 @@ class PARQUET_EXPORT FileReader { /// \brief Return arrow schema for all the columns. virtual ::arrow::Status GetSchema(std::shared_ptr<::arrow::Schema>* out) = 0; - /// \brief Return arrow schema by apply selection of column indices. - /// \returns error status if passed wrong indices. - virtual ::arrow::Status GetSchema(const std::vector& indices, - std::shared_ptr<::arrow::Schema>* out) = 0; - // Read column as a whole into an Array. virtual ::arrow::Status ReadColumn(int i, std::shared_ptr<::arrow::ChunkedArray>* out) = 0; @@ -311,6 +308,17 @@ ::arrow::Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& f const ArrowReaderProperties& properties, std::unique_ptr* reader); +PARQUET_EXPORT +::arrow::Status FromParquetSchema( + const SchemaDescriptor* parquet_schema, const ArrowReaderProperties& properties, + const std::shared_ptr& key_value_metadata, + std::shared_ptr<::arrow::Schema>* out); + +PARQUET_EXPORT +::arrow::Status FromParquetSchema(const SchemaDescriptor* parquet_schema, + const ArrowReaderProperties& properties, + std::shared_ptr<::arrow::Schema>* out); + } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index de001736a23..f41e8753c28 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -22,18 +22,24 @@ #include #include #include +#include #include +#include #include +#include + #include "arrow/array.h" +#include "arrow/builder.h" #include "arrow/compute/kernel.h" #include "arrow/table.h" #include "arrow/type.h" -#include "arrow/type_traits.h" +#include "arrow/util/checked_cast.h" #include "arrow/util/int-util.h" #include "arrow/util/logging.h" #include "arrow/util/ubsan.h" +#include "parquet/arrow/reader.h" #include "parquet/column_reader.h" #include "parquet/platform.h" #include "parquet/schema.h" @@ -55,10 +61,15 @@ using arrow::TimestampArray; using arrow::compute::Datum; using ::arrow::BitUtil::FromBigEndian; +using ::arrow::internal::checked_cast; using ::arrow::internal::SafeLeftShift; using ::arrow::util::SafeLoadAs; using parquet::internal::RecordReader; +using parquet::schema::GroupNode; +using parquet::schema::Node; +using parquet::schema::PrimitiveNode; +using ParquetType = parquet::Type; namespace parquet { namespace arrow { @@ -66,6 +77,497 @@ namespace arrow { template using ArrayType = typename ::arrow::TypeTraits::ArrayType; +// ---------------------------------------------------------------------- +// Schema logic + +static Status MakeArrowDecimal(const LogicalType& logical_type, + std::shared_ptr* out) { + const auto& decimal = checked_cast(logical_type); + *out = ::arrow::decimal(decimal.precision(), decimal.scale()); + return Status::OK(); +} + +static Status MakeArrowInt(const LogicalType& logical_type, + std::shared_ptr* out) { + const auto& integer = checked_cast(logical_type); + switch (integer.bit_width()) { + case 8: + *out = integer.is_signed() ? ::arrow::int8() : ::arrow::uint8(); + break; + case 16: + *out = integer.is_signed() ? ::arrow::int16() : ::arrow::uint16(); + break; + case 32: + *out = integer.is_signed() ? ::arrow::int32() : ::arrow::uint32(); + break; + default: + return Status::TypeError(logical_type.ToString(), + " can not annotate physical type Int32"); + } + return Status::OK(); +} + +static Status MakeArrowInt64(const LogicalType& logical_type, + std::shared_ptr* out) { + const auto& integer = checked_cast(logical_type); + switch (integer.bit_width()) { + case 64: + *out = integer.is_signed() ? ::arrow::int64() : ::arrow::uint64(); + break; + default: + return Status::TypeError(logical_type.ToString(), + " can not annotate physical type Int64"); + } + return Status::OK(); +} + +static Status MakeArrowTime32(const LogicalType& logical_type, + std::shared_ptr* out) { + const auto& time = checked_cast(logical_type); + switch (time.time_unit()) { + case LogicalType::TimeUnit::MILLIS: + *out = ::arrow::time32(::arrow::TimeUnit::MILLI); + break; + default: + return Status::TypeError(logical_type.ToString(), + " can not annotate physical type Time32"); + } + return Status::OK(); +} + +static Status MakeArrowTime64(const LogicalType& logical_type, + std::shared_ptr* out) { + const auto& time = checked_cast(logical_type); + switch (time.time_unit()) { + case LogicalType::TimeUnit::MICROS: + *out = ::arrow::time64(::arrow::TimeUnit::MICRO); + break; + case LogicalType::TimeUnit::NANOS: + *out = ::arrow::time64(::arrow::TimeUnit::NANO); + break; + default: + return Status::TypeError(logical_type.ToString(), + " can not annotate physical type Time64"); + } + return Status::OK(); +} + +static Status MakeArrowTimestamp(const LogicalType& logical_type, + std::shared_ptr* out) { + const auto& timestamp = checked_cast(logical_type); + const bool utc_normalized = + timestamp.is_from_converted_type() ? false : timestamp.is_adjusted_to_utc(); + static const char* utc_timezone = "UTC"; + switch (timestamp.time_unit()) { + case LogicalType::TimeUnit::MILLIS: + *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MILLI, utc_timezone) + : ::arrow::timestamp(::arrow::TimeUnit::MILLI)); + break; + case LogicalType::TimeUnit::MICROS: + *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MICRO, utc_timezone) + : ::arrow::timestamp(::arrow::TimeUnit::MICRO)); + break; + case LogicalType::TimeUnit::NANOS: + *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::NANO, utc_timezone) + : ::arrow::timestamp(::arrow::TimeUnit::NANO)); + break; + default: + return Status::TypeError("Unrecognized time unit in timestamp logical_type: ", + logical_type.ToString()); + } + return Status::OK(); +} + +static Status FromByteArray(const LogicalType& logical_type, + std::shared_ptr* out) { + switch (logical_type.type()) { + case LogicalType::Type::STRING: + *out = ::arrow::utf8(); + break; + case LogicalType::Type::DECIMAL: + RETURN_NOT_OK(MakeArrowDecimal(logical_type, out)); + break; + case LogicalType::Type::NONE: + case LogicalType::Type::ENUM: + case LogicalType::Type::JSON: + case LogicalType::Type::BSON: + *out = ::arrow::binary(); + break; + default: + return Status::NotImplemented("Unhandled logical logical_type ", + logical_type.ToString(), " for binary array"); + } + return Status::OK(); +} + +static Status FromFLBA(const LogicalType& logical_type, int32_t physical_length, + std::shared_ptr* out) { + switch (logical_type.type()) { + case LogicalType::Type::DECIMAL: + RETURN_NOT_OK(MakeArrowDecimal(logical_type, out)); + break; + case LogicalType::Type::NONE: + case LogicalType::Type::INTERVAL: + case LogicalType::Type::UUID: + *out = ::arrow::fixed_size_binary(physical_length); + break; + default: + return Status::NotImplemented("Unhandled logical logical_type ", + logical_type.ToString(), + " for fixed-length binary array"); + } + + return Status::OK(); +} + +static Status FromInt32(const LogicalType& logical_type, std::shared_ptr* out) { + switch (logical_type.type()) { + case LogicalType::Type::INT: + RETURN_NOT_OK(MakeArrowInt(logical_type, out)); + break; + case LogicalType::Type::DATE: + *out = ::arrow::date32(); + break; + case LogicalType::Type::TIME: + RETURN_NOT_OK(MakeArrowTime32(logical_type, out)); + break; + case LogicalType::Type::DECIMAL: + RETURN_NOT_OK(MakeArrowDecimal(logical_type, out)); + break; + case LogicalType::Type::NONE: + *out = ::arrow::int32(); + break; + default: + return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(), + " for INT32"); + } + return Status::OK(); +} + +static Status FromInt64(const LogicalType& logical_type, std::shared_ptr* out) { + switch (logical_type.type()) { + case LogicalType::Type::INT: + RETURN_NOT_OK(MakeArrowInt64(logical_type, out)); + break; + case LogicalType::Type::DECIMAL: + RETURN_NOT_OK(MakeArrowDecimal(logical_type, out)); + break; + case LogicalType::Type::TIMESTAMP: + RETURN_NOT_OK(MakeArrowTimestamp(logical_type, out)); + break; + case LogicalType::Type::TIME: + RETURN_NOT_OK(MakeArrowTime64(logical_type, out)); + break; + case LogicalType::Type::NONE: + *out = ::arrow::int64(); + break; + default: + return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(), + " for INT64"); + } + return Status::OK(); +} + +Status GetPrimitiveType(const schema::PrimitiveNode& primitive, + std::shared_ptr* out) { + const std::shared_ptr& logical_type = primitive.logical_type(); + if (logical_type->is_invalid() || logical_type->is_null()) { + *out = ::arrow::null(); + return Status::OK(); + } + + switch (primitive.physical_type()) { + case ParquetType::BOOLEAN: + *out = ::arrow::boolean(); + break; + case ParquetType::INT32: + RETURN_NOT_OK(FromInt32(*logical_type, out)); + break; + case ParquetType::INT64: + RETURN_NOT_OK(FromInt64(*logical_type, out)); + break; + case ParquetType::INT96: + *out = ::arrow::timestamp(::arrow::TimeUnit::NANO); + break; + case ParquetType::FLOAT: + *out = ::arrow::float32(); + break; + case ParquetType::DOUBLE: + *out = ::arrow::float64(); + break; + case ParquetType::BYTE_ARRAY: + RETURN_NOT_OK(FromByteArray(*logical_type, out)); + break; + case ParquetType::FIXED_LEN_BYTE_ARRAY: + RETURN_NOT_OK(FromFLBA(*logical_type, primitive.type_length(), out)); + break; + default: { + // PARQUET-1565: This can occur if the file is corrupt + return Status::IOError("Invalid physical column type: ", + TypeToString(primitive.physical_type())); + } + } + return Status::OK(); +} + +struct SchemaTreeContext { + SchemaManifest* manifest; + ArrowReaderProperties properties; + const SchemaDescriptor* schema; + + void LinkParent(const SchemaField* child, const SchemaField* parent) { + manifest->child_to_parent[child] = parent; + } + + void RecordLeaf(const SchemaField* leaf) { + manifest->column_index_to_field[leaf->column_index] = leaf; + } +}; + +Status GetTypeForNode(int column_index, const schema::PrimitiveNode& primitive_node, + SchemaTreeContext* ctx, std::shared_ptr* out) { + std::shared_ptr storage_type; + RETURN_NOT_OK(GetPrimitiveType(primitive_node, &storage_type)); + if (ctx->properties.read_dictionary(column_index)) { + *out = ::arrow::dictionary(::arrow::int32(), storage_type); + } else { + *out = storage_type; + } + return Status::OK(); +} + +Status NodeToSchemaField(const Node& node, int16_t max_def_level, int16_t max_rep_level, + SchemaTreeContext* ctx, const SchemaField* parent, + SchemaField* out); + +Status GroupToSchemaField(const GroupNode& node, int16_t max_def_level, + int16_t max_rep_level, SchemaTreeContext* ctx, + const SchemaField* parent, SchemaField* out); + +Status PopulateLeaf(int column_index, const std::shared_ptr& field, + int16_t max_def_level, int16_t max_rep_level, SchemaTreeContext* ctx, + const SchemaField* parent, SchemaField* out) { + out->field = field; + out->column_index = column_index; + out->max_definition_level = max_def_level; + out->max_repetition_level = max_rep_level; + ctx->RecordLeaf(out); + ctx->LinkParent(out, parent); + return Status::OK(); +} + +// Special case mentioned in the format spec: +// If the name is array or ends in _tuple, this should be a list of struct +// even for single child elements. +bool HasStructListName(const GroupNode& node) { + return node.name() == "array" || boost::algorithm::ends_with(node.name(), "_tuple"); +} + +Status GroupToStruct(const GroupNode& node, int16_t max_def_level, int16_t max_rep_level, + SchemaTreeContext* ctx, const SchemaField* parent, + SchemaField* out) { + std::vector> arrow_fields; + out->children.resize(node.field_count()); + for (int i = 0; i < node.field_count(); i++) { + RETURN_NOT_OK(NodeToSchemaField(*node.field(i), max_def_level, max_rep_level, ctx, + out, &out->children[i])); + arrow_fields.push_back(out->children[i].field); + } + auto struct_type = ::arrow::struct_(arrow_fields); + out->field = ::arrow::field(node.name(), struct_type, node.is_optional()); + out->max_definition_level = max_def_level; + out->max_repetition_level = max_rep_level; + return Status::OK(); +} + +Status ListToSchemaField(const GroupNode& group, int16_t max_def_level, + int16_t max_rep_level, SchemaTreeContext* ctx, + const SchemaField* parent, SchemaField* out) { + if (group.field_count() != 1) { + return Status::NotImplemented( + "Only LIST-annotated groups with a single child can be handled."); + } + + out->children.resize(1); + SchemaField* child_field = &out->children[0]; + + ctx->LinkParent(out, parent); + ctx->LinkParent(child_field, out); + + const Node& list_node = *group.field(0); + + if (!list_node.is_repeated()) { + return Status::NotImplemented( + "Non-repeated nodes in a LIST-annotated group are not supported."); + } + + ++max_def_level; + ++max_rep_level; + if (list_node.is_group()) { + // Resolve 3-level encoding + // + // required/optional group name=whatever { + // repeated group name=list { + // required/optional TYPE item; + // } + // } + // + // yields list ?nullable + // + // We distinguish the special base that we have + // + // required/optional group name=whatever { + // repeated group name=array or $SOMETHING_tuple { + // required/optional TYPE item; + // } + // } + // + // In this latter case, the inner type of the list should be a struct + // rather than a primitive value + // + // yields list not null> ?nullable + const auto& list_group = static_cast(list_node); + // Special case mentioned in the format spec: + // If the name is array or ends in _tuple, this should be a list of struct + // even for single child elements. + if (list_group.field_count() == 1 && !HasStructListName(list_group)) { + // List of primitive type + RETURN_NOT_OK(NodeToSchemaField(*list_group.field(0), max_def_level, max_rep_level, + ctx, out, child_field)); + } else { + RETURN_NOT_OK( + GroupToStruct(list_group, max_def_level, max_rep_level, ctx, out, child_field)); + } + } else { + // Two-level list encoding + // + // required/optional group LIST { + // repeated TYPE; + // } + const auto& primitive_node = static_cast(list_node); + int column_index = ctx->schema->GetColumnIndex(primitive_node); + std::shared_ptr type; + RETURN_NOT_OK(GetTypeForNode(column_index, primitive_node, ctx, &type)); + auto item_field = ::arrow::field(list_node.name(), type, /*nullable=*/false); + RETURN_NOT_OK(PopulateLeaf(column_index, item_field, max_def_level, max_rep_level, + ctx, out, child_field)); + } + out->field = ::arrow::field(group.name(), ::arrow::list(child_field->field), + group.is_optional()); + out->max_definition_level = max_def_level; + out->max_repetition_level = max_rep_level; + return Status::OK(); +} + +Status GroupToSchemaField(const GroupNode& node, int16_t max_def_level, + int16_t max_rep_level, SchemaTreeContext* ctx, + const SchemaField* parent, SchemaField* out) { + if (node.logical_type()->is_list()) { + return ListToSchemaField(node, max_def_level, max_rep_level, ctx, parent, out); + } + std::shared_ptr type; + if (node.is_repeated()) { + // Simple repeated struct + // + // repeated group $NAME { + // r/o TYPE[0] f0 + // r/o TYPE[1] f1 + // } + out->children.resize(1); + RETURN_NOT_OK( + GroupToStruct(node, max_def_level, max_rep_level, ctx, out, &out->children[0])); + out->field = ::arrow::field(node.name(), ::arrow::list(out->children[0].field), + node.is_optional()); + out->max_definition_level = max_def_level; + out->max_repetition_level = max_rep_level; + return Status::OK(); + } else { + return GroupToStruct(node, max_def_level, max_rep_level, ctx, parent, out); + } +} + +Status NodeToSchemaField(const Node& node, int16_t max_def_level, int16_t max_rep_level, + SchemaTreeContext* ctx, const SchemaField* parent, + SchemaField* out) { + if (node.is_optional()) { + ++max_def_level; + } else if (node.is_repeated()) { + // Repeated fields add a definition level. This is used to distinguish + // between an empty list and a list with an item in it. + ++max_rep_level; + ++max_def_level; + } + + ctx->LinkParent(out, parent); + + // Now, walk the schema and create a ColumnDescriptor for each leaf node + if (node.is_group()) { + return GroupToSchemaField(static_cast(node), max_def_level, + max_rep_level, ctx, parent, out); + } else { + const auto& primitive_node = static_cast(node); + int column_index = ctx->schema->GetColumnIndex(primitive_node); + std::shared_ptr type; + RETURN_NOT_OK(GetTypeForNode(column_index, primitive_node, ctx, &type)); + if (node.is_repeated()) { + // One-level list encoding, e.g. + // a: repeated int32; + out->children.resize(1); + auto child_field = ::arrow::field(node.name(), type, /*nullable=*/false); + RETURN_NOT_OK(PopulateLeaf(column_index, child_field, max_def_level, max_rep_level, + ctx, out, &out->children[0])); + + out->field = ::arrow::field(node.name(), ::arrow::list(child_field), + /*nullable=*/false); + // Is this right? + out->max_definition_level = max_def_level; + out->max_repetition_level = max_rep_level; + return Status::OK(); + } else { + return PopulateLeaf(column_index, + ::arrow::field(node.name(), type, node.is_optional()), + max_def_level, max_rep_level, ctx, parent, out); + } + } +} + +Status BuildSchemaManifest(const SchemaDescriptor* schema, + const ArrowReaderProperties& properties, + SchemaManifest* manifest) { + SchemaTreeContext ctx; + ctx.manifest = manifest; + ctx.properties = properties; + ctx.schema = schema; + const GroupNode& schema_node = *schema->group_node(); + manifest->descr = schema; + manifest->schema_fields.resize(schema_node.field_count()); + for (int i = 0; i < static_cast(schema_node.field_count()); ++i) { + RETURN_NOT_OK(NodeToSchemaField(*schema_node.field(i), 0, 0, &ctx, + /*parent=*/nullptr, &manifest->schema_fields[i])); + } + return Status::OK(); +} + +Status FromParquetSchema( + const SchemaDescriptor* schema, const ArrowReaderProperties& properties, + const std::shared_ptr& key_value_metadata, + std::shared_ptr<::arrow::Schema>* out) { + SchemaManifest manifest; + RETURN_NOT_OK(BuildSchemaManifest(schema, properties, &manifest)); + std::vector> fields(manifest.schema_fields.size()); + for (int i = 0; i < static_cast(fields.size()); i++) { + fields[i] = manifest.schema_fields[i].field; + } + *out = ::arrow::schema(fields, key_value_metadata); + return Status::OK(); +} + +Status FromParquetSchema(const SchemaDescriptor* parquet_schema, + const ArrowReaderProperties& properties, + std::shared_ptr<::arrow::Schema>* out) { + return FromParquetSchema(parquet_schema, properties, nullptr, out); +} + // ---------------------------------------------------------------------- // Primitive types @@ -183,11 +685,8 @@ Status TransferDictionary(RecordReader* reader, auto dict_reader = dynamic_cast(reader); DCHECK(dict_reader); *out = dict_reader->GetResult(); - - const auto& dict_type = static_cast(*(*out)->type()); - if (!logical_value_type->Equals(*dict_type.value_type())) { - *out = CastChunksTo(**out, - ::arrow::dictionary(dict_type.index_type(), logical_value_type)); + if (!logical_value_type->Equals(*(*out)->type())) { + *out = CastChunksTo(**out, logical_value_type); } return Status::OK(); } @@ -196,7 +695,8 @@ Status TransferBinary(RecordReader* reader, const std::shared_ptr& logical_value_type, std::shared_ptr* out) { if (reader->read_dictionary()) { - return TransferDictionary(reader, logical_value_type, out); + return TransferDictionary( + reader, ::arrow::dictionary(::arrow::int32(), logical_value_type), out); } auto binary_reader = dynamic_cast(reader); DCHECK(binary_reader); @@ -519,7 +1019,12 @@ Status TransferColumnData(internal::RecordReader* reader, const ColumnDescriptor* descr, MemoryPool* pool, std::shared_ptr* out) { Datum result; + std::shared_ptr chunked_result; switch (value_type->id()) { + case ::arrow::Type::DICTIONARY: { + RETURN_NOT_OK(TransferDictionary(reader, value_type, &chunked_result)); + result = chunked_result; + } break; case ::arrow::Type::NA: { result = std::make_shared<::arrow::NullArray>(reader->values_written()); break; @@ -548,9 +1053,8 @@ Status TransferColumnData(internal::RecordReader* reader, case ::arrow::Type::FIXED_SIZE_BINARY: case ::arrow::Type::BINARY: case ::arrow::Type::STRING: { - std::shared_ptr out; - RETURN_NOT_OK(TransferBinary(reader, value_type, &out)); - result = out; + RETURN_NOT_OK(TransferBinary(reader, value_type, &chunked_result)); + result = chunked_result; } break; case ::arrow::Type::DECIMAL: { switch (descr->physical_type()) { @@ -612,5 +1116,114 @@ Status TransferColumnData(internal::RecordReader* reader, return Status::OK(); } +Status ReconstructNestedList(const std::shared_ptr& arr, + std::shared_ptr field, int16_t max_def_level, + int16_t max_rep_level, const int16_t* def_levels, + const int16_t* rep_levels, int64_t total_levels, + ::arrow::MemoryPool* pool, std::shared_ptr* out) { + // Walk downwards to extract nullability + std::vector nullable; + std::vector> offset_builders; + std::vector> valid_bits_builders; + nullable.push_back(field->nullable()); + while (field->type()->num_children() > 0) { + if (field->type()->num_children() > 1) { + return Status::NotImplemented("Fields with more than one child are not supported."); + } else { + if (field->type()->id() != ::arrow::Type::LIST) { + return Status::NotImplemented("Currently only nesting with Lists is supported."); + } + field = field->type()->child(0); + } + offset_builders.emplace_back( + std::make_shared<::arrow::Int32Builder>(::arrow::int32(), pool)); + valid_bits_builders.emplace_back( + std::make_shared<::arrow::BooleanBuilder>(::arrow::boolean(), pool)); + nullable.push_back(field->nullable()); + } + + int64_t list_depth = offset_builders.size(); + // This describes the minimal definition that describes a level that + // reflects a value in the primitive values array. + int16_t values_def_level = max_def_level; + if (nullable[nullable.size() - 1]) { + values_def_level--; + } + + // The definition levels that are needed so that a list is declared + // as empty and not null. + std::vector empty_def_level(list_depth); + int def_level = 0; + for (int i = 0; i < list_depth; i++) { + if (nullable[i]) { + def_level++; + } + empty_def_level[i] = static_cast(def_level); + def_level++; + } + + int32_t values_offset = 0; + std::vector null_counts(list_depth, 0); + for (int64_t i = 0; i < total_levels; i++) { + int16_t rep_level = rep_levels[i]; + if (rep_level < max_rep_level) { + for (int64_t j = rep_level; j < list_depth; j++) { + if (j == (list_depth - 1)) { + RETURN_NOT_OK(offset_builders[j]->Append(values_offset)); + } else { + RETURN_NOT_OK(offset_builders[j]->Append( + static_cast(offset_builders[j + 1]->length()))); + } + + if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) { + RETURN_NOT_OK(valid_bits_builders[j]->Append(false)); + null_counts[j]++; + break; + } else { + RETURN_NOT_OK(valid_bits_builders[j]->Append(true)); + if (empty_def_level[j] == def_levels[i]) { + break; + } + } + } + } + if (def_levels[i] >= values_def_level) { + values_offset++; + } + } + // Add the final offset to all lists + for (int64_t j = 0; j < list_depth; j++) { + if (j == (list_depth - 1)) { + RETURN_NOT_OK(offset_builders[j]->Append(values_offset)); + } else { + RETURN_NOT_OK(offset_builders[j]->Append( + static_cast(offset_builders[j + 1]->length()))); + } + } + + std::vector> offsets; + std::vector> valid_bits; + std::vector list_lengths; + for (int64_t j = 0; j < list_depth; j++) { + list_lengths.push_back(offset_builders[j]->length() - 1); + std::shared_ptr array; + RETURN_NOT_OK(offset_builders[j]->Finish(&array)); + offsets.emplace_back(std::static_pointer_cast(array)->values()); + RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array)); + valid_bits.emplace_back(std::static_pointer_cast(array)->values()); + } + + *out = arr; + + // TODO(wesm): Use passed-in field + for (int64_t j = list_depth - 1; j >= 0; j--) { + auto list_type = + ::arrow::list(::arrow::field("item", (*out)->type(), nullable[j + 1])); + *out = std::make_shared<::arrow::ListArray>(list_type, list_lengths[j], offsets[j], + *out, valid_bits[j], null_counts[j]); + } + return Status::OK(); +} + } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/reader_internal.h b/cpp/src/parquet/arrow/reader_internal.h index 5c0fa5ec315..cbf44ecb5f6 100644 --- a/cpp/src/parquet/arrow/reader_internal.h +++ b/cpp/src/parquet/arrow/reader_internal.h @@ -17,17 +17,32 @@ #pragma once +#include #include +#include +#include +#include + +#include "arrow/status.h" + +#include "parquet/column_reader.h" +#include "parquet/file_reader.h" +#include "parquet/metadata.h" +#include "parquet/schema.h" namespace arrow { +class Array; class ChunkedArray; class DataType; +class Field; class MemoryPool; -class Status; +class Schema; } // namespace arrow +using arrow::Status; + namespace parquet { class ColumnDescriptor; @@ -40,11 +55,145 @@ class RecordReader; namespace arrow { -::arrow::Status TransferColumnData(internal::RecordReader* reader, - std::shared_ptr<::arrow::DataType> value_type, - const ColumnDescriptor* descr, - ::arrow::MemoryPool* pool, - std::shared_ptr<::arrow::ChunkedArray>* out); +class ArrowReaderProperties; +class ColumnReaderImpl; + +// ---------------------------------------------------------------------- +// Iteration utilities + +// Abstraction to decouple row group iteration details from the ColumnReader, +// so we can read only a single row group if we want +class FileColumnIterator { + public: + explicit FileColumnIterator(int column_index, ParquetFileReader* reader, + std::vector row_groups) + : column_index_(column_index), + reader_(reader), + schema_(reader->metadata()->schema()), + row_groups_(row_groups.begin(), row_groups.end()) {} + + virtual ~FileColumnIterator() {} + + std::unique_ptr<::parquet::PageReader> NextChunk() { + if (row_groups_.empty()) { + return nullptr; + } + + auto row_group_reader = reader_->RowGroup(row_groups_.front()); + row_groups_.pop_front(); + return row_group_reader->GetColumnPageReader(column_index_); + } + + const SchemaDescriptor* schema() const { return schema_; } + + const ColumnDescriptor* descr() const { return schema_->Column(column_index_); } + + std::shared_ptr metadata() const { return reader_->metadata(); } + + int column_index() const { return column_index_; } + + protected: + int column_index_; + ParquetFileReader* reader_; + const SchemaDescriptor* schema_; + std::deque row_groups_; +}; + +using FileColumnIteratorFactory = + std::function; + +Status TransferColumnData(::parquet::internal::RecordReader* reader, + std::shared_ptr<::arrow::DataType> value_type, + const ColumnDescriptor* descr, ::arrow::MemoryPool* pool, + std::shared_ptr<::arrow::ChunkedArray>* out); + +Status ReconstructNestedList(const std::shared_ptr<::arrow::Array>& arr, + std::shared_ptr<::arrow::Field> field, int16_t max_def_level, + int16_t max_rep_level, const int16_t* def_levels, + const int16_t* rep_levels, int64_t total_levels, + ::arrow::MemoryPool* pool, + std::shared_ptr<::arrow::Array>* out); + +struct ReaderContext { + ParquetFileReader* reader; + ::arrow::MemoryPool* pool; + FileColumnIteratorFactory iterator_factory; + bool filter_leaves; + std::unordered_set included_leaves; + + bool IncludesLeaf(int leaf_index) const { + return (!this->filter_leaves || + (included_leaves.find(leaf_index) != included_leaves.end())); + } +}; + +struct PARQUET_EXPORT SchemaField { + std::shared_ptr<::arrow::Field> field; + std::vector children; + + // Only set for leaf nodes + int column_index = -1; + + int16_t max_definition_level; + int16_t max_repetition_level; + + bool is_leaf() const { return column_index != -1; } + + Status GetReader(const ReaderContext& context, + std::unique_ptr* out) const; +}; + +struct SchemaManifest { + const SchemaDescriptor* descr; + std::vector schema_fields; + + std::unordered_map column_index_to_field; + std::unordered_map child_to_parent; + + Status GetColumnField(int column_index, const SchemaField** out) const { + auto it = column_index_to_field.find(column_index); + if (it == column_index_to_field.end()) { + return Status::KeyError("Column index ", column_index, + " not found in schema manifest, may be malformed"); + } + *out = it->second; + return Status::OK(); + } + + const SchemaField* GetParent(const SchemaField* field) const { + // Returns nullptr also if not found + auto it = child_to_parent.find(field); + if (it == child_to_parent.end()) { + return nullptr; + } + return it->second; + } + + bool GetFieldIndices(const std::vector& column_indices, std::vector* out) { + // Coalesce a list of schema fields indices which are the roots of the + // columns referred by a list of column indices + const schema::GroupNode* group = descr->group_node(); + std::unordered_set already_added; + out->clear(); + for (auto& column_idx : column_indices) { + auto field_node = descr->GetColumnRoot(column_idx); + auto field_idx = group->FieldIndex(*field_node); + if (field_idx < 0) { + return false; + } + auto insertion = already_added.insert(field_idx); + if (insertion.second) { + out->push_back(field_idx); + } + } + return true; + } +}; + +PARQUET_EXPORT +Status BuildSchemaManifest(const SchemaDescriptor* schema, + const ArrowReaderProperties& properties, + SchemaManifest* manifest); } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 6404338e157..82c85663980 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -53,446 +53,8 @@ namespace parquet { namespace arrow { -const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI); -const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO); -const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO); - -static Status MakeArrowDecimal(const LogicalType& logical_type, - std::shared_ptr* out) { - const auto& decimal = checked_cast(logical_type); - *out = ::arrow::decimal(decimal.precision(), decimal.scale()); - return Status::OK(); -} - -static Status MakeArrowInt(const LogicalType& logical_type, - std::shared_ptr* out) { - const auto& integer = checked_cast(logical_type); - switch (integer.bit_width()) { - case 8: - *out = integer.is_signed() ? ::arrow::int8() : ::arrow::uint8(); - break; - case 16: - *out = integer.is_signed() ? ::arrow::int16() : ::arrow::uint16(); - break; - case 32: - *out = integer.is_signed() ? ::arrow::int32() : ::arrow::uint32(); - break; - default: - return Status::TypeError(logical_type.ToString(), - " can not annotate physical type Int32"); - } - return Status::OK(); -} - -static Status MakeArrowInt64(const LogicalType& logical_type, - std::shared_ptr* out) { - const auto& integer = checked_cast(logical_type); - switch (integer.bit_width()) { - case 64: - *out = integer.is_signed() ? ::arrow::int64() : ::arrow::uint64(); - break; - default: - return Status::TypeError(logical_type.ToString(), - " can not annotate physical type Int64"); - } - return Status::OK(); -} - -static Status MakeArrowTime32(const LogicalType& logical_type, - std::shared_ptr* out) { - const auto& time = checked_cast(logical_type); - switch (time.time_unit()) { - case LogicalType::TimeUnit::MILLIS: - *out = ::arrow::time32(::arrow::TimeUnit::MILLI); - break; - default: - return Status::TypeError(logical_type.ToString(), - " can not annotate physical type Time32"); - } - return Status::OK(); -} - -static Status MakeArrowTime64(const LogicalType& logical_type, - std::shared_ptr* out) { - const auto& time = checked_cast(logical_type); - switch (time.time_unit()) { - case LogicalType::TimeUnit::MICROS: - *out = ::arrow::time64(::arrow::TimeUnit::MICRO); - break; - case LogicalType::TimeUnit::NANOS: - *out = ::arrow::time64(::arrow::TimeUnit::NANO); - break; - default: - return Status::TypeError(logical_type.ToString(), - " can not annotate physical type Time64"); - } - return Status::OK(); -} - -static Status MakeArrowTimestamp(const LogicalType& logical_type, - std::shared_ptr* out) { - const auto& timestamp = checked_cast(logical_type); - const bool utc_normalized = - timestamp.is_from_converted_type() ? false : timestamp.is_adjusted_to_utc(); - static const char* utc_timezone = "UTC"; - switch (timestamp.time_unit()) { - case LogicalType::TimeUnit::MILLIS: - *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MILLI, utc_timezone) - : ::arrow::timestamp(::arrow::TimeUnit::MILLI)); - break; - case LogicalType::TimeUnit::MICROS: - *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MICRO, utc_timezone) - : ::arrow::timestamp(::arrow::TimeUnit::MICRO)); - break; - case LogicalType::TimeUnit::NANOS: - *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::NANO, utc_timezone) - : ::arrow::timestamp(::arrow::TimeUnit::NANO)); - break; - default: - return Status::TypeError("Unrecognized time unit in timestamp logical_type: ", - logical_type.ToString()); - } - return Status::OK(); -} - -static Status FromByteArray(const LogicalType& logical_type, - std::shared_ptr* out) { - switch (logical_type.type()) { - case LogicalType::Type::STRING: - *out = ::arrow::utf8(); - break; - case LogicalType::Type::DECIMAL: - RETURN_NOT_OK(MakeArrowDecimal(logical_type, out)); - break; - case LogicalType::Type::NONE: - case LogicalType::Type::ENUM: - case LogicalType::Type::JSON: - case LogicalType::Type::BSON: - *out = ::arrow::binary(); - break; - default: - return Status::NotImplemented("Unhandled logical logical_type ", - logical_type.ToString(), " for binary array"); - } - return Status::OK(); -} - -static Status FromFLBA(const LogicalType& logical_type, int32_t physical_length, - std::shared_ptr* out) { - switch (logical_type.type()) { - case LogicalType::Type::DECIMAL: - RETURN_NOT_OK(MakeArrowDecimal(logical_type, out)); - break; - case LogicalType::Type::NONE: - case LogicalType::Type::INTERVAL: - case LogicalType::Type::UUID: - *out = ::arrow::fixed_size_binary(physical_length); - break; - default: - return Status::NotImplemented("Unhandled logical logical_type ", - logical_type.ToString(), - " for fixed-length binary array"); - } - - return Status::OK(); -} - -static Status FromInt32(const LogicalType& logical_type, - std::shared_ptr* out) { - switch (logical_type.type()) { - case LogicalType::Type::INT: - RETURN_NOT_OK(MakeArrowInt(logical_type, out)); - break; - case LogicalType::Type::DATE: - *out = ::arrow::date32(); - break; - case LogicalType::Type::TIME: - RETURN_NOT_OK(MakeArrowTime32(logical_type, out)); - break; - case LogicalType::Type::DECIMAL: - RETURN_NOT_OK(MakeArrowDecimal(logical_type, out)); - break; - case LogicalType::Type::NONE: - *out = ::arrow::int32(); - break; - default: - return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(), - " for INT32"); - } - return Status::OK(); -} - -static Status FromInt64(const LogicalType& logical_type, - std::shared_ptr* out) { - switch (logical_type.type()) { - case LogicalType::Type::INT: - RETURN_NOT_OK(MakeArrowInt64(logical_type, out)); - break; - case LogicalType::Type::DECIMAL: - RETURN_NOT_OK(MakeArrowDecimal(logical_type, out)); - break; - case LogicalType::Type::TIMESTAMP: - RETURN_NOT_OK(MakeArrowTimestamp(logical_type, out)); - break; - case LogicalType::Type::TIME: - RETURN_NOT_OK(MakeArrowTime64(logical_type, out)); - break; - case LogicalType::Type::NONE: - *out = ::arrow::int64(); - break; - default: - return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(), - " for INT64"); - } - return Status::OK(); -} - -Status FromPrimitive(const PrimitiveNode& primitive, std::shared_ptr* out) { - const std::shared_ptr& logical_type = primitive.logical_type(); - if (logical_type->is_invalid() || logical_type->is_null()) { - *out = ::arrow::null(); - return Status::OK(); - } - - switch (primitive.physical_type()) { - case ParquetType::BOOLEAN: - *out = ::arrow::boolean(); - break; - case ParquetType::INT32: - RETURN_NOT_OK(FromInt32(*logical_type, out)); - break; - case ParquetType::INT64: - RETURN_NOT_OK(FromInt64(*logical_type, out)); - break; - case ParquetType::INT96: - *out = TIMESTAMP_NS; - break; - case ParquetType::FLOAT: - *out = ::arrow::float32(); - break; - case ParquetType::DOUBLE: - *out = ::arrow::float64(); - break; - case ParquetType::BYTE_ARRAY: - RETURN_NOT_OK(FromByteArray(*logical_type, out)); - break; - case ParquetType::FIXED_LEN_BYTE_ARRAY: - RETURN_NOT_OK(FromFLBA(*logical_type, primitive.type_length(), out)); - break; - default: { - // PARQUET-1565: This can occur if the file is corrupt - return Status::IOError("Invalid physical column type: ", - TypeToString(primitive.physical_type())); - } - } - return Status::OK(); -} - -// Forward declaration -Status NodeToFieldInternal(const Node& node, - const std::unordered_set* included_leaf_nodes, - std::shared_ptr* out); - -/* - * Auxilary function to test if a parquet schema node is a leaf node - * that should be included in a resulting arrow schema - */ -inline bool IsIncludedLeaf(const Node& node, - const std::unordered_set* included_leaf_nodes) { - if (included_leaf_nodes == nullptr) { - return true; - } - auto search = included_leaf_nodes->find(&node); - return (search != included_leaf_nodes->end()); -} - -Status StructFromGroup(const GroupNode& group, - const std::unordered_set* included_leaf_nodes, - std::shared_ptr* out) { - std::vector> fields; - std::shared_ptr field; - - *out = nullptr; - - for (int i = 0; i < group.field_count(); i++) { - RETURN_NOT_OK(NodeToFieldInternal(*group.field(i), included_leaf_nodes, &field)); - if (field != nullptr) { - fields.push_back(field); - } - } - if (fields.size() > 0) { - *out = std::make_shared<::arrow::StructType>(fields); - } - return Status::OK(); -} - -Status NodeToList(const GroupNode& group, - const std::unordered_set* included_leaf_nodes, - std::shared_ptr* out) { - *out = nullptr; - if (group.field_count() == 1) { - // This attempts to resolve the preferred 3-level list encoding. - const Node& list_node = *group.field(0); - if (list_node.is_group() && list_node.is_repeated()) { - const auto& list_group = static_cast(list_node); - // Special case mentioned in the format spec: - // If the name is array or ends in _tuple, this should be a list of struct - // even for single child elements. - if (list_group.field_count() == 1 && !schema::HasStructListName(list_group)) { - // List of primitive type - std::shared_ptr item_field; - RETURN_NOT_OK( - NodeToFieldInternal(*list_group.field(0), included_leaf_nodes, &item_field)); - - if (item_field != nullptr) { - *out = ::arrow::list(item_field); - } - } else { - // List of struct - std::shared_ptr inner_type; - RETURN_NOT_OK(StructFromGroup(list_group, included_leaf_nodes, &inner_type)); - if (inner_type != nullptr) { - auto item_field = std::make_shared(list_node.name(), inner_type, false); - *out = ::arrow::list(item_field); - } - } - } else if (list_node.is_repeated()) { - // repeated primitive node - std::shared_ptr inner_type; - if (IsIncludedLeaf(static_cast(list_node), included_leaf_nodes)) { - RETURN_NOT_OK( - FromPrimitive(static_cast(list_node), &inner_type)); - auto item_field = std::make_shared(list_node.name(), inner_type, false); - *out = ::arrow::list(item_field); - } - } else { - return Status::NotImplemented( - "Non-repeated groups in a LIST-annotated group are not supported."); - } - } else { - return Status::NotImplemented( - "Only LIST-annotated groups with a single child can be handled."); - } - return Status::OK(); -} - -Status NodeToField(const Node& node, std::shared_ptr* out) { - return NodeToFieldInternal(node, nullptr, out); -} - -Status NodeToFieldInternal(const Node& node, - const std::unordered_set* included_leaf_nodes, - std::shared_ptr* out) { - std::shared_ptr type = nullptr; - bool nullable = !node.is_required(); - - *out = nullptr; - - if (node.is_repeated()) { - // 1-level LIST encoding fields are required - std::shared_ptr inner_type; - if (node.is_group()) { - RETURN_NOT_OK(StructFromGroup(static_cast(node), - included_leaf_nodes, &inner_type)); - } else if (IsIncludedLeaf(node, included_leaf_nodes)) { - RETURN_NOT_OK(FromPrimitive(static_cast(node), &inner_type)); - } - if (inner_type != nullptr) { - auto item_field = std::make_shared(node.name(), inner_type, false); - type = ::arrow::list(item_field); - nullable = false; - } - } else if (node.is_group()) { - const auto& group = static_cast(node); - if (node.logical_type()->is_list()) { - RETURN_NOT_OK(NodeToList(group, included_leaf_nodes, &type)); - } else { - RETURN_NOT_OK(StructFromGroup(group, included_leaf_nodes, &type)); - } - } else { - // Primitive (leaf) node - if (IsIncludedLeaf(node, included_leaf_nodes)) { - RETURN_NOT_OK(FromPrimitive(static_cast(node), &type)); - } - } - if (type != nullptr) { - *out = std::make_shared(node.name(), type, nullable); - } - return Status::OK(); -} - -std::shared_ptr ToDictionary32(const Field& field) { - auto new_ty = ::arrow::dictionary(::arrow::int32(), field.type()); - return field.WithType(new_ty); -} - -Status FromParquetSchema( - const SchemaDescriptor* parquet_schema, const ArrowReaderProperties& properties, - const std::shared_ptr& key_value_metadata, - std::shared_ptr<::arrow::Schema>* out) { - const GroupNode& schema_node = *parquet_schema->group_node(); - - int num_fields = static_cast(schema_node.field_count()); - std::vector> fields(num_fields); - for (int i = 0; i < num_fields; i++) { - RETURN_NOT_OK(NodeToField(*schema_node.field(i), &fields[i])); - } - *out = std::make_shared<::arrow::Schema>(fields, key_value_metadata); - return Status::OK(); -} - -Status FromParquetSchema( - const SchemaDescriptor* parquet_schema, const std::vector& column_indices, - const ArrowReaderProperties& properties, - const std::shared_ptr& key_value_metadata, - std::shared_ptr<::arrow::Schema>* out) { - // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes - // from the root Parquet node - - // Put the right leaf nodes in an unordered set - // Index in column_indices should be unique, duplicate indices are merged into one and - // ordering by its first appearing. - int num_columns = static_cast(column_indices.size()); - std::unordered_set top_nodes; // to deduplicate the top nodes - std::vector base_nodes; // to keep the ordering - std::unordered_set included_leaf_nodes(num_columns); - for (int i = 0; i < num_columns; i++) { - const ColumnDescriptor* column_desc = parquet_schema->Column(column_indices[i]); - const Node* node = column_desc->schema_node().get(); - - included_leaf_nodes.insert(node); - const Node* column_root = parquet_schema->GetColumnRoot(column_indices[i]); - auto it = top_nodes.insert(column_root); - if (it.second) { - base_nodes.push_back(column_root); - } - } - - std::vector> fields; - std::shared_ptr field; - for (auto node : base_nodes) { - RETURN_NOT_OK(NodeToFieldInternal(*node, &included_leaf_nodes, &field)); - if (field != nullptr) { - fields.push_back(field); - } - } - - *out = std::make_shared<::arrow::Schema>(fields, key_value_metadata); - return Status::OK(); -} - -Status FromParquetSchema(const SchemaDescriptor* parquet_schema, - const std::vector& column_indices, - const ArrowReaderProperties& properties, - std::shared_ptr<::arrow::Schema>* out) { - return FromParquetSchema(parquet_schema, column_indices, properties, nullptr, out); -} - -Status FromParquetSchema(const SchemaDescriptor* parquet_schema, - const ArrowReaderProperties& properties, - std::shared_ptr<::arrow::Schema>* out) { - return FromParquetSchema(parquet_schema, properties, nullptr, out); -} +// ---------------------------------------------------------------------- +// Parquet to Arrow schema conversion Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::string& name, bool nullable, const WriterProperties& properties, diff --git a/cpp/src/parquet/arrow/schema.h b/cpp/src/parquet/arrow/schema.h index 1133431fa8d..b3cc66bfe6c 100644 --- a/cpp/src/parquet/arrow/schema.h +++ b/cpp/src/parquet/arrow/schema.h @@ -39,49 +39,8 @@ class WriterProperties; namespace arrow { -class ArrowReaderProperties; class ArrowWriterProperties; -PARQUET_EXPORT -::arrow::Status NodeToField(const schema::Node& node, - std::shared_ptr<::arrow::Field>* out); - -/// Convert parquet schema to arrow schema with selected indices -/// \param parquet_schema to be converted -/// \param column_indices indices of leaf nodes in parquet schema tree. Appearing ordering -/// matters for the converted schema. Repeated indices are ignored -/// except for the first one -/// \param properties reader options for FileReader -/// \param key_value_metadata optional metadata, can be nullptr -/// \param out the corresponding arrow schema -/// \return Status::OK() on a successful conversion. -PARQUET_EXPORT -::arrow::Status FromParquetSchema( - const SchemaDescriptor* parquet_schema, const std::vector& column_indices, - const ArrowReaderProperties& properties, - const std::shared_ptr& key_value_metadata, - std::shared_ptr<::arrow::Schema>* out); - -// Without indices -PARQUET_EXPORT -::arrow::Status FromParquetSchema( - const SchemaDescriptor* parquet_schema, const ArrowReaderProperties& properties, - const std::shared_ptr& key_value_metadata, - std::shared_ptr<::arrow::Schema>* out); - -// Without metadata -PARQUET_EXPORT -::arrow::Status FromParquetSchema(const SchemaDescriptor* parquet_schema, - const std::vector& column_indices, - const ArrowReaderProperties& properties, - std::shared_ptr<::arrow::Schema>* out); - -// Without metadata or indices -PARQUET_EXPORT -::arrow::Status FromParquetSchema(const SchemaDescriptor* parquet_schema, - const ArrowReaderProperties& properties, - std::shared_ptr<::arrow::Schema>* out); - PARQUET_EXPORT ::arrow::Status FieldToNode(const std::shared_ptr<::arrow::Field>& field, const WriterProperties& properties, diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index bb2bab1335a..ee3b8807037 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -33,6 +34,7 @@ #include "arrow/util/logging.h" #include "parquet/arrow/reader.h" +#include "parquet/arrow/reader_internal.h" #include "parquet/arrow/schema.h" #include "parquet/column_writer.h" #include "parquet/exception.h" @@ -78,7 +80,12 @@ namespace { class LevelBuilder { public: - explicit LevelBuilder(MemoryPool* pool) : def_levels_(pool), rep_levels_(pool) {} + explicit LevelBuilder(MemoryPool* pool, const SchemaField* schema_field, + const SchemaManifest* schema_manifest) + : def_levels_(pool), + rep_levels_(pool), + schema_field_(schema_field), + schema_manifest_(schema_manifest) {} Status VisitInline(const Array& array); @@ -121,8 +128,23 @@ class LevelBuilder { #undef NOT_IMPLEMENTED_VISIT - Status GenerateLevels(const Array& array, const std::shared_ptr& field, - int64_t* values_offset, int64_t* num_values, int64_t* num_levels, + Status ExtractNullability() { + // Walk upwards to extract nullability + const SchemaField* current_field = schema_field_; + while (current_field != nullptr) { + nullable_.push_front(current_field->field->nullable()); + if (current_field->field->type()->num_children() > 1) { + return Status::NotImplemented( + "Fields with more than one child are not supported."); + } else { + current_field = schema_manifest_->GetParent(current_field); + } + } + return Status::OK(); + } + + Status GenerateLevels(const Array& array, int64_t* values_offset, int64_t* num_values, + int64_t* num_levels, const std::shared_ptr& def_levels_scratch, std::shared_ptr* def_levels_out, std::shared_ptr* rep_levels_out, @@ -135,18 +157,7 @@ class LevelBuilder { *values_offset = min_offset_idx_; *values_array = values_array_; - // Walk downwards to extract nullability - std::shared_ptr current_field = field; - nullable_.push_back(current_field->nullable()); - while (current_field->type()->num_children() > 0) { - if (current_field->type()->num_children() > 1) { - return Status::NotImplemented( - "Fields with more than one child are not supported."); - } else { - current_field = current_field->type()->child(0); - } - nullable_.push_back(current_field->nullable()); - } + RETURN_NOT_OK(ExtractNullability()); // Generate the levels. if (nullable_.size() == 1) { @@ -264,11 +275,14 @@ class LevelBuilder { Int16BufferBuilder def_levels_; Int16BufferBuilder rep_levels_; + const SchemaField* schema_field_; + const SchemaManifest* schema_manifest_; + std::vector null_counts_; std::vector valid_bitmaps_; std::vector offsets_; std::vector array_offsets_; - std::vector nullable_; + std::deque nullable_; int64_t min_offset_idx_; int64_t max_offset_idx_; @@ -319,8 +333,12 @@ Status GetLeafType(const ::arrow::DataType& type, ::arrow::Type::type* leaf_type class ArrowColumnWriter { public: ArrowColumnWriter(ColumnWriterContext* ctx, ColumnWriter* column_writer, - const std::shared_ptr& field) - : ctx_(ctx), writer_(column_writer), field_(field) {} + const SchemaField* schema_field, + const SchemaManifest* schema_manifest) + : ctx_(ctx), + writer_(column_writer), + schema_field_(schema_field), + schema_manifest_(schema_manifest) {} Status Write(const Array& data); @@ -430,7 +448,8 @@ class ArrowColumnWriter { ColumnWriterContext* ctx_; ColumnWriter* writer_; - std::shared_ptr field_; + const SchemaField* schema_field_; + const SchemaManifest* schema_manifest_; }; template @@ -945,11 +964,10 @@ Status ArrowColumnWriter::Write(const Array& data) { int64_t values_offset = 0; int64_t num_levels = 0; int64_t num_values = 0; - LevelBuilder level_builder(ctx_->memory_pool); - + LevelBuilder level_builder(ctx_->memory_pool, schema_field_, schema_manifest_); std::shared_ptr def_levels_buffer, rep_levels_buffer; RETURN_NOT_OK(level_builder.GenerateLevels( - data, field_, &values_offset, &num_values, &num_levels, ctx_->def_levels_buffer, + data, &values_offset, &num_values, &num_levels, ctx_->def_levels_buffer, &def_levels_buffer, &rep_levels_buffer, &_values_array)); const int16_t* def_levels = nullptr; if (def_levels_buffer) { @@ -1021,6 +1039,11 @@ class FileWriter::Impl { arrow_properties_(arrow_properties), closed_(false) {} + Status Init() { + return BuildSchemaManifest(writer_->schema(), default_arrow_reader_properties(), + &schema_manifest_); + } + Status NewRowGroup(int64_t chunk_size) { if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); @@ -1075,17 +1098,11 @@ class FileWriter::Impl { ColumnWriter* column_writer; PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn()); - // TODO(wesm): This trick to construct a schema for one Parquet root node - // will not work for arbitrary nested data - int current_column_idx = row_group_writer_->current_column(); - std::shared_ptr<::arrow::Schema> arrow_schema; - RETURN_NOT_OK(FromParquetSchema(writer_->schema(), {current_column_idx - 1}, - default_arrow_reader_properties(), - writer_->key_value_metadata(), &arrow_schema)); - - ArrowColumnWriter arrow_writer(&column_write_context_, column_writer, - arrow_schema->field(0)); - + const SchemaField* schema_field; + RETURN_NOT_OK(schema_manifest_.GetColumnField(row_group_writer_->current_column(), + &schema_field)); + ArrowColumnWriter arrow_writer(&column_write_context_, column_writer, schema_field, + &schema_manifest_); RETURN_NOT_OK(arrow_writer.Write(*data, offset, size)); return arrow_writer.Close(); } @@ -1101,6 +1118,8 @@ class FileWriter::Impl { private: friend class FileWriter; + SchemaManifest schema_manifest_; + std::unique_ptr writer_; RowGroupWriter* row_group_writer_; ColumnWriterContext column_write_context_; @@ -1135,6 +1154,15 @@ const std::shared_ptr FileWriter::metadata() const { FileWriter::~FileWriter() {} +Status FileWriter::Make(::arrow::MemoryPool* pool, + std::unique_ptr writer, + const std::shared_ptr<::arrow::Schema>& schema, + const std::shared_ptr& arrow_properties, + std::unique_ptr* out) { + out->reset(new FileWriter(pool, std::move(writer), schema, arrow_properties)); + return (*out)->impl_->Init(); +} + FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr writer, const std::shared_ptr<::arrow::Schema>& schema, const std::shared_ptr& arrow_properties) @@ -1163,9 +1191,7 @@ Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool ParquetFileWriter::Open(sink, schema_node, properties, schema.metadata()); auto schema_ptr = std::make_shared<::arrow::Schema>(schema); - writer->reset( - new FileWriter(pool, std::move(base_writer), schema_ptr, arrow_properties)); - return Status::OK(); + return Make(pool, std::move(base_writer), schema_ptr, arrow_properties, writer); } Status WriteFileMetaData(const FileMetaData& file_metadata, diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h index 2b9d892649a..8f7d4990e18 100644 --- a/cpp/src/parquet/arrow/writer.h +++ b/cpp/src/parquet/arrow/writer.h @@ -128,10 +128,11 @@ std::shared_ptr PARQUET_EXPORT default_arrow_writer_prope */ class PARQUET_EXPORT FileWriter { public: - FileWriter(::arrow::MemoryPool* pool, std::unique_ptr writer, - const std::shared_ptr<::arrow::Schema>& schema, - const std::shared_ptr& arrow_properties = - default_arrow_writer_properties()); + static ::arrow::Status Make( + ::arrow::MemoryPool* pool, std::unique_ptr writer, + const std::shared_ptr<::arrow::Schema>& schema, + const std::shared_ptr& arrow_properties, + std::unique_ptr* out); static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink, @@ -164,6 +165,10 @@ class PARQUET_EXPORT FileWriter { const std::shared_ptr metadata() const; private: + FileWriter(::arrow::MemoryPool* pool, std::unique_ptr writer, + const std::shared_ptr<::arrow::Schema>& schema, + const std::shared_ptr& arrow_properties); + class PARQUET_NO_EXPORT Impl; std::unique_ptr impl_; std::shared_ptr<::arrow::Schema> schema_; diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index a741cfa2b87..22c75fa05fb 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -87,7 +87,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents { properties_(properties), total_bytes_written_(0), closed_(false), - current_column_index_(0), + next_column_index_(0), num_rows_(0), buffered_row_group_(buffered_row_group) { if (buffered_row_group) { @@ -122,7 +122,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents { total_bytes_written_ += column_writers_[0]->Close(); } - ++current_column_index_; + ++next_column_index_; const ColumnDescriptor* column_descr = col_meta->descr(); std::unique_ptr pager = @@ -192,7 +192,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents { const WriterProperties* properties_; int64_t total_bytes_written_; bool closed_; - int current_column_index_; + int next_column_index_; mutable int64_t num_rows_; bool buffered_row_group_; @@ -203,7 +203,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents { if (num_rows_ == 0) { num_rows_ = current_col_rows; } else if (num_rows_ != current_col_rows) { - ThrowRowsMisMatchError(current_column_index_, current_col_rows, num_rows_); + ThrowRowsMisMatchError(next_column_index_, current_col_rows, num_rows_); } } else if (buffered_row_group_ && column_writers_.size() > 0) { // when buffered_row_group = true diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index 85073d175f7..59ebaea77d9 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -767,32 +767,32 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl { public: explicit RowGroupMetaDataBuilderImpl(const std::shared_ptr& props, const SchemaDescriptor* schema, void* contents) - : properties_(props), schema_(schema), current_column_(0) { + : properties_(props), schema_(schema), next_column_(0) { row_group_ = reinterpret_cast(contents); InitializeColumns(schema->num_columns()); } ColumnChunkMetaDataBuilder* NextColumnChunk() { - if (!(current_column_ < num_columns())) { + if (!(next_column_ < num_columns())) { std::stringstream ss; ss << "The schema only has " << num_columns() - << " columns, requested metadata for column: " << current_column_; + << " columns, requested metadata for column: " << next_column_; throw ParquetException(ss.str()); } - auto column = schema_->Column(current_column_); + auto column = schema_->Column(next_column_); auto column_builder = ColumnChunkMetaDataBuilder::Make( - properties_, column, &row_group_->columns[current_column_++]); + properties_, column, &row_group_->columns[next_column_++]); auto column_builder_ptr = column_builder.get(); column_builders_.push_back(std::move(column_builder)); return column_builder_ptr; } - int current_column() { return current_column_; } + int current_column() { return next_column_ - 1; } void Finish(int64_t total_bytes_written) { - if (!(current_column_ == schema_->num_columns())) { + if (!(next_column_ == schema_->num_columns())) { std::stringstream ss; - ss << "Only " << current_column_ - 1 << " out of " << schema_->num_columns() + ss << "Only " << next_column_ - 1 << " out of " << schema_->num_columns() << " columns are initialized"; throw ParquetException(ss.str()); } @@ -825,7 +825,7 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl { const std::shared_ptr properties_; const SchemaDescriptor* schema_; std::vector> column_builders_; - int current_column_; + int next_column_; }; std::unique_ptr RowGroupMetaDataBuilder::Make( diff --git a/cpp/src/parquet/schema-internal.h b/cpp/src/parquet/schema-internal.h index ad5ae4419fc..9bcdc5d443b 100644 --- a/cpp/src/parquet/schema-internal.h +++ b/cpp/src/parquet/schema-internal.h @@ -39,57 +39,6 @@ class SchemaElement; namespace schema { -inline bool str_endswith_tuple(const std::string& str) { - if (str.size() >= 6) { - return str.substr(str.size() - 6, 6) == "_tuple"; - } - return false; -} - -// Special case mentioned in the format spec: -// If the name is array or ends in _tuple, this should be a list of struct -// even for single child elements. -inline bool HasStructListName(const GroupNode& node) { - return (node.name() == "array" || str_endswith_tuple(node.name())); -} - -// TODO(itaiin): This aux. function is to be deleted once repeated structs are supported -inline bool IsSimpleStruct(const Node* node) { - if (!node->is_group()) return false; - if (node->is_repeated()) return false; - if (node->converted_type() == ConvertedType::LIST) return false; - // Special case mentioned in the format spec: - // If the name is array or ends in _tuple, this should be a list of struct - // even for single child elements. - auto group = static_cast(node); - if (group->field_count() == 1 && HasStructListName(*group)) return false; - - return true; -} - -// Coalesce a list of schema fields indices which are the roots of the -// columns referred by a list of column indices -inline bool ColumnIndicesToFieldIndices(const SchemaDescriptor& descr, - const std::vector& column_indices, - std::vector* out) { - const GroupNode* group = descr.group_node(); - std::unordered_set already_added; - out->clear(); - for (auto& column_idx : column_indices) { - auto field_node = descr.GetColumnRoot(column_idx); - auto field_idx = group->FieldIndex(*field_node); - if (field_idx < 0) { - return false; - } - auto insertion = already_added.insert(field_idx); - if (insertion.second) { - out->push_back(field_idx); - } - } - - return true; -} - // ---------------------------------------------------------------------- // Conversion from Parquet Thrift metadata diff --git a/cpp/src/parquet/schema.cc b/cpp/src/parquet/schema.cc index 71aa919e91a..e961127bc1c 100644 --- a/cpp/src/parquet/schema.cc +++ b/cpp/src/parquet/schema.cc @@ -857,6 +857,9 @@ void SchemaDescriptor::BuildTree(const NodePtr& node, int16_t max_def_level, BuildTree(group->field(i), max_def_level, max_rep_level, base); } } else { + node_to_leaf_index_[static_cast(node.get())] = + static_cast(leaves_.size()); + // Primitive node, append to leaves leaves_.push_back(ColumnDescriptor(node, max_def_level, max_rep_level, this)); leaf_to_base_.emplace(static_cast(leaves_.size()) - 1, base); @@ -865,6 +868,14 @@ void SchemaDescriptor::BuildTree(const NodePtr& node, int16_t max_def_level, } } +int SchemaDescriptor::GetColumnIndex(const PrimitiveNode& node) const { + auto it = node_to_leaf_index_.find(&node); + if (it == node_to_leaf_index_.end()) { + return -1; + } + return it->second; +} + ColumnDescriptor::ColumnDescriptor(const schema::NodePtr& node, int16_t max_definition_level, int16_t max_repetition_level, diff --git a/cpp/src/parquet/schema.h b/cpp/src/parquet/schema.h index 566d5fb02e8..8fb3a5482f4 100644 --- a/cpp/src/parquet/schema.h +++ b/cpp/src/parquet/schema.h @@ -430,6 +430,10 @@ class PARQUET_EXPORT SchemaDescriptor { void updateColumnOrders(const std::vector& column_orders); + /// \brief Return column index corresponding to a particular + /// PrimitiveNode. Returns -1 if not found + int GetColumnIndex(const schema::PrimitiveNode& node) const; + private: friend class ColumnDescriptor; @@ -444,6 +448,8 @@ class PARQUET_EXPORT SchemaDescriptor { // Result of leaf node / tree analysis std::vector leaves_; + std::unordered_map node_to_leaf_index_; + // Mapping between leaf nodes and root group of leaf (first node // below the schema's root group) // diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 270bb619f75..3a9dc9ba1c1 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -390,14 +390,14 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil: void set_use_threads(c_bool use_threads) - -cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil: CStatus FromParquetSchema( const SchemaDescriptor* parquet_schema, const ArrowReaderProperties& properties, const shared_ptr[const CKeyValueMetadata]& key_value_metadata, shared_ptr[CSchema]* out) +cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil: + CStatus ToParquetSchema( const CSchema* arrow_schema, const ArrowReaderProperties& properties, diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 2031feaaaa5..7af16a72539 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -752,12 +752,8 @@ write_parquet_file <- function(table, filename){ invisible(.Call(`_arrow_write_parquet_file` , table, filename)) } -parquet___arrow___FileReader__GetSchema2 <- function(reader, indices){ - .Call(`_arrow_parquet___arrow___FileReader__GetSchema2` , reader, indices) -} - -parquet___arrow___FileReader__GetSchema1 <- function(reader){ - .Call(`_arrow_parquet___arrow___FileReader__GetSchema1` , reader) +parquet___arrow___FileReader__GetSchema <- function(reader){ + .Call(`_arrow_parquet___arrow___FileReader__GetSchema` , reader) } RecordBatch__num_columns <- function(x){ diff --git a/r/R/parquet.R b/r/R/parquet.R index 2647536409c..b75f93ea804 100644 --- a/r/R/parquet.R +++ b/r/R/parquet.R @@ -25,18 +25,13 @@ if(quo_is_null(col_select)) { shared_ptr(`arrow::Table`, parquet___arrow___FileReader__ReadTable1(self)) } else { - all_vars <- shared_ptr(`arrow::Schema`, parquet___arrow___FileReader__GetSchema1(self))$names + all_vars <- shared_ptr(`arrow::Schema`, parquet___arrow___FileReader__GetSchema(self))$names indices <- match(vars_select(all_vars, !!col_select), all_vars) - 1L shared_ptr(`arrow::Table`, parquet___arrow___FileReader__ReadTable2(self, indices)) } }, - GetSchema = function(column_indices = NULL) { - if (is.null(column_indices)) { - shared_ptr(`arrow::Schema`, parquet___arrow___FileReader__GetSchema1(self)) - } else { - shared_ptr(`arrow::Schema`, parquet___arrow___FileReader__GetSchema2(self, column_indices)) - } - + GetSchema = function() { + shared_ptr(`arrow::Schema`, parquet___arrow___FileReader__GetSchema(self)) } ) ) diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index bcb0ac59b95..8cfaa7714ea 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -2900,32 +2900,16 @@ RcppExport SEXP _arrow_write_parquet_file(SEXP table_sexp, SEXP filename_sexp){ // parquet.cpp #if defined(ARROW_R_WITH_ARROW) -std::shared_ptr parquet___arrow___FileReader__GetSchema2(const std::unique_ptr& reader, const std::vector& indices); -RcppExport SEXP _arrow_parquet___arrow___FileReader__GetSchema2(SEXP reader_sexp, SEXP indices_sexp){ +std::shared_ptr parquet___arrow___FileReader__GetSchema(const std::unique_ptr& reader); +RcppExport SEXP _arrow_parquet___arrow___FileReader__GetSchema(SEXP reader_sexp){ BEGIN_RCPP Rcpp::traits::input_parameter&>::type reader(reader_sexp); - Rcpp::traits::input_parameter&>::type indices(indices_sexp); - return Rcpp::wrap(parquet___arrow___FileReader__GetSchema2(reader, indices)); + return Rcpp::wrap(parquet___arrow___FileReader__GetSchema(reader)); END_RCPP } #else -RcppExport SEXP _arrow_parquet___arrow___FileReader__GetSchema2(SEXP reader_sexp, SEXP indices_sexp){ - Rf_error("Cannot call parquet___arrow___FileReader__GetSchema2(). Please use arrow::install_arrow() to install required runtime libraries. "); -} -#endif - -// parquet.cpp -#if defined(ARROW_R_WITH_ARROW) -std::shared_ptr parquet___arrow___FileReader__GetSchema1(const std::unique_ptr& reader); -RcppExport SEXP _arrow_parquet___arrow___FileReader__GetSchema1(SEXP reader_sexp){ -BEGIN_RCPP - Rcpp::traits::input_parameter&>::type reader(reader_sexp); - return Rcpp::wrap(parquet___arrow___FileReader__GetSchema1(reader)); -END_RCPP -} -#else -RcppExport SEXP _arrow_parquet___arrow___FileReader__GetSchema1(SEXP reader_sexp){ - Rf_error("Cannot call parquet___arrow___FileReader__GetSchema1(). Please use arrow::install_arrow() to install required runtime libraries. "); +RcppExport SEXP _arrow_parquet___arrow___FileReader__GetSchema(SEXP reader_sexp){ + Rf_error("Cannot call parquet___arrow___FileReader__GetSchema(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif @@ -3904,8 +3888,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_parquet___arrow___FileReader__ReadTable1", (DL_FUNC) &_arrow_parquet___arrow___FileReader__ReadTable1, 1}, { "_arrow_parquet___arrow___FileReader__ReadTable2", (DL_FUNC) &_arrow_parquet___arrow___FileReader__ReadTable2, 2}, { "_arrow_write_parquet_file", (DL_FUNC) &_arrow_write_parquet_file, 2}, - { "_arrow_parquet___arrow___FileReader__GetSchema2", (DL_FUNC) &_arrow_parquet___arrow___FileReader__GetSchema2, 2}, - { "_arrow_parquet___arrow___FileReader__GetSchema1", (DL_FUNC) &_arrow_parquet___arrow___FileReader__GetSchema1, 1}, + { "_arrow_parquet___arrow___FileReader__GetSchema", (DL_FUNC) &_arrow_parquet___arrow___FileReader__GetSchema, 1}, { "_arrow_RecordBatch__num_columns", (DL_FUNC) &_arrow_RecordBatch__num_columns, 1}, { "_arrow_RecordBatch__num_rows", (DL_FUNC) &_arrow_RecordBatch__num_rows, 1}, { "_arrow_RecordBatch__schema", (DL_FUNC) &_arrow_RecordBatch__schema, 1}, diff --git a/r/src/parquet.cpp b/r/src/parquet.cpp index 9ad14381371..5124e9e32cc 100644 --- a/r/src/parquet.cpp +++ b/r/src/parquet.cpp @@ -94,26 +94,11 @@ void write_parquet_file(const std::shared_ptr& table, } // [[arrow::export]] -std::shared_ptr parquet___arrow___FileReader__GetSchema2( - const std::unique_ptr& reader, - const std::vector& indices) { +std::shared_ptr parquet___arrow___FileReader__GetSchema( + const std::unique_ptr& reader) { std::shared_ptr schema; - STOP_IF_NOT_OK(reader->GetSchema(indices, &schema)); + STOP_IF_NOT_OK(reader->GetSchema(&schema)); return schema; } -// [[arrow::export]] -std::shared_ptr parquet___arrow___FileReader__GetSchema1( - const std::unique_ptr& reader) { - // FileReader does not have this exposed - // std::shared_ptr schema; - // STOP_IF_NOT_OK(reader->GetSchema(&schema)); - - // so going indirectly about it - std::shared_ptr record_batch_reader; - STOP_IF_NOT_OK(reader->GetRecordBatchReader({}, &record_batch_reader)); - - return record_batch_reader->schema(); -} - #endif