From 2f52cf4eed1033d1bf1f043d9063e462e60d6605 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Sun, 12 Jun 2016 11:48:10 +0200 Subject: [PATCH] ARROW-215: Support other integer types and strings in Parquet I/O Change-Id: I72c6c82bc38c895a04172531bebbc78d4fb08732 --- cpp/src/arrow/parquet/parquet-io-test.cc | 461 ++++++++++++------- cpp/src/arrow/parquet/parquet-schema-test.cc | 4 +- cpp/src/arrow/parquet/reader.cc | 160 ++++++- cpp/src/arrow/parquet/schema.cc | 47 +- cpp/src/arrow/parquet/schema.h | 9 +- cpp/src/arrow/parquet/test-util.h | 136 +++++- cpp/src/arrow/parquet/writer.cc | 234 ++++++++-- cpp/src/arrow/parquet/writer.h | 9 +- cpp/src/arrow/test-util.h | 2 + cpp/src/arrow/types/primitive.cc | 5 + python/pyarrow/includes/parquet.pxd | 13 +- python/pyarrow/parquet.pyx | 22 +- python/pyarrow/tests/test_parquet.py | 43 +- 13 files changed, 901 insertions(+), 244 deletions(-) diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc index edcac887056..572cae16e58 100644 --- a/cpp/src/arrow/parquet/parquet-io-test.cc +++ b/cpp/src/arrow/parquet/parquet-io-test.cc @@ -21,7 +21,9 @@ #include "arrow/parquet/test-util.h" #include "arrow/parquet/reader.h" #include "arrow/parquet/writer.h" +#include "arrow/types/construct.h" #include "arrow/types/primitive.h" +#include "arrow/types/string.h" #include "arrow/util/memory-pool.h" #include "arrow/util/status.h" @@ -30,12 +32,15 @@ using ParquetBuffer = parquet::Buffer; using parquet::BufferReader; +using parquet::default_writer_properties; using parquet::InMemoryOutputStream; +using parquet::LogicalType; using parquet::ParquetFileReader; using parquet::ParquetFileWriter; using parquet::RandomAccessSource; using parquet::Repetition; using parquet::SchemaDescriptor; +using parquet::ParquetVersion; using ParquetType = parquet::Type; using parquet::schema::GroupNode; using parquet::schema::NodePtr; @@ -51,26 +56,114 @@ const int LARGE_SIZE = 10000; template struct test_traits {}; +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static uint8_t const value; +}; + +const uint8_t test_traits::value(1); + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_8; + static uint8_t const value; +}; + +const uint8_t test_traits::value(64); + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::INT_8; + static int8_t const value; +}; + +const int8_t test_traits::value(-64); + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_16; + static uint16_t const value; +}; + +const uint16_t test_traits::value(1024); + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::INT_16; + static int16_t const value; +}; + +const int16_t test_traits::value(-1024); + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_32; + static uint32_t const value; +}; + +const uint32_t test_traits::value(1024); + template <> struct test_traits { static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static int32_t const value; +}; + +const int32_t test_traits::value(-1024); + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::INT64; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_64; + static uint64_t const value; }; +const uint64_t test_traits::value(1024); + template <> struct test_traits { static constexpr ParquetType::type parquet_enum = ParquetType::INT64; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static int64_t const value; }; +const int64_t test_traits::value(-1024); + template <> struct test_traits { static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static float const value; }; +const float test_traits::value(2.1f); + template <> struct test_traits { static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static double const value; +}; + +const double test_traits::value(4.2); + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; + static constexpr LogicalType::type logical_enum = LogicalType::UTF8; + static std::string const value; }; +const std::string test_traits::value("Test"); + template using ParquetDataType = ::parquet::DataType::parquet_enum>; @@ -80,18 +173,18 @@ using ParquetWriter = ::parquet::TypedColumnWriter>; template class TestParquetIO : public ::testing::Test { public: - typedef typename TestType::c_type T; virtual void SetUp() {} - std::shared_ptr MakeSchema( - ParquetType::type parquet_type, Repetition::type repetition) { - auto pnode = PrimitiveNode::Make("column1", repetition, parquet_type); + std::shared_ptr MakeSchema(Repetition::type repetition) { + auto pnode = PrimitiveNode::Make("column1", repetition, + test_traits::parquet_enum, test_traits::logical_enum); NodePtr node_ = GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); return std::static_pointer_cast(node_); } - std::unique_ptr MakeWriter(std::shared_ptr& schema) { + std::unique_ptr MakeWriter( + const std::shared_ptr& schema) { sink_ = std::make_shared(); return ParquetFileWriter::Open(sink_, schema); } @@ -106,113 +199,74 @@ class TestParquetIO : public ::testing::Test { std::unique_ptr file_reader, std::shared_ptr* out) { arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); std::unique_ptr column_reader; - ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader))); + ASSERT_OK_NO_THROW(reader.GetFlatColumn(0, &column_reader)); ASSERT_NE(nullptr, column_reader.get()); + ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out)); ASSERT_NE(nullptr, out->get()); } + void ReadAndCheckSingleColumnFile(Array* values) { + std::shared_ptr out; + ReadSingleColumnFile(ReaderFromSink(), &out); + ASSERT_TRUE(values->Equals(out)); + } + void ReadTableFromFile( std::unique_ptr file_reader, std::shared_ptr* out) { arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); - ASSERT_NO_THROW(ASSERT_OK(reader.ReadFlatTable(out))); + ASSERT_OK_NO_THROW(reader.ReadFlatTable(out)); ASSERT_NE(nullptr, out->get()); } - std::unique_ptr TestFile(std::vector& values, int num_chunks) { - std::shared_ptr schema = - MakeSchema(test_traits::parquet_enum, Repetition::REQUIRED); - std::unique_ptr file_writer = MakeWriter(schema); - size_t chunk_size = values.size() / num_chunks; - for (int i = 0; i < num_chunks; i++) { - auto row_group_writer = file_writer->AppendRowGroup(chunk_size); - auto column_writer = - static_cast*>(row_group_writer->NextColumn()); - T* data = values.data() + i * chunk_size; - column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); - column_writer->Close(); - row_group_writer->Close(); - } - file_writer->Close(); - return ReaderFromSink(); + void ReadAndCheckSingleColumnTable(const std::shared_ptr& values) { + std::shared_ptr
out; + ReadTableFromFile(ReaderFromSink(), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(values->length(), out->num_rows()); + + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); + } + + template + void WriteFlatColumn(const std::shared_ptr& schema, + const std::shared_ptr& values) { + FileWriter writer(default_memory_pool(), MakeWriter(schema)); + ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length())); + ASSERT_OK_NO_THROW(writer.WriteFlatColumnChunk(values.get())); + ASSERT_OK_NO_THROW(writer.Close()); } std::shared_ptr sink_; }; -typedef ::testing::Types TestTypes; - -TYPED_TEST_CASE(TestParquetIO, TestTypes); - -TYPED_TEST(TestParquetIO, SingleColumnRequiredRead) { - std::vector values(SMALL_SIZE, 128); - std::unique_ptr file_reader = this->TestFile(values, 1); - - std::shared_ptr out; - this->ReadSingleColumnFile(std::move(file_reader), &out); - - ExpectArray(values.data(), out.get()); -} - -TYPED_TEST(TestParquetIO, SingleColumnRequiredTableRead) { - std::vector values(SMALL_SIZE, 128); - std::unique_ptr file_reader = this->TestFile(values, 1); - - std::shared_ptr
out; - this->ReadTableFromFile(std::move(file_reader), &out); - ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(SMALL_SIZE, out->num_rows()); - - std::shared_ptr chunked_array = out->column(0)->data(); - ASSERT_EQ(1, chunked_array->num_chunks()); - ExpectArray(values.data(), chunked_array->chunk(0).get()); -} - -TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedRead) { - std::vector values(SMALL_SIZE, 128); - std::unique_ptr file_reader = this->TestFile(values, 4); - - std::shared_ptr out; - this->ReadSingleColumnFile(std::move(file_reader), &out); +// We habe separate tests for UInt32Type as this is currently the only type +// where a roundtrip does not yield the identical Array structure. +// There we write an UInt32 Array but receive an Int64 Array as result for +// Parquet version 1.0. - ExpectArray(values.data(), out.get()); -} - -TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedTableRead) { - std::vector values(SMALL_SIZE, 128); - std::unique_ptr file_reader = this->TestFile(values, 4); - - std::shared_ptr
out; - this->ReadTableFromFile(std::move(file_reader), &out); - ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(SMALL_SIZE, out->num_rows()); +typedef ::testing::Types TestTypes; - std::shared_ptr chunked_array = out->column(0)->data(); - ASSERT_EQ(1, chunked_array->num_chunks()); - ExpectArray(values.data(), chunked_array->chunk(0).get()); -} +TYPED_TEST_CASE(TestParquetIO, TestTypes); TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) { - std::shared_ptr values = NonNullArray(SMALL_SIZE, 128); + auto values = NonNullArray(SMALL_SIZE); - std::shared_ptr schema = - this->MakeSchema(test_traits::parquet_enum, Repetition::REQUIRED); - FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); - ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length()))); - ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get()))); - ASSERT_NO_THROW(ASSERT_OK(writer.Close())); + std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); + this->WriteFlatColumn(schema, values); - std::shared_ptr out; - this->ReadSingleColumnFile(this->ReaderFromSink(), &out); - ASSERT_TRUE(values->Equals(out)); + this->ReadAndCheckSingleColumnFile(values.get()); } TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { - std::shared_ptr values = NonNullArray(SMALL_SIZE, 128); + auto values = NonNullArray(SMALL_SIZE); std::shared_ptr
table = MakeSimpleTable(values, false); this->sink_ = std::make_shared(); - ASSERT_NO_THROW(ASSERT_OK( - WriteFlatTable(table.get(), default_memory_pool(), this->sink_, values->length()))); + ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, + values->length(), default_writer_properties())); std::shared_ptr
out; this->ReadTableFromFile(this->ReaderFromSink(), &out); @@ -226,113 +280,208 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { // This also tests max_definition_level = 1 - std::shared_ptr values = NullableArray(SMALL_SIZE, 128, 10); + auto values = NullableArray(SMALL_SIZE, 10); - std::shared_ptr schema = - this->MakeSchema(test_traits::parquet_enum, Repetition::OPTIONAL); - FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); - ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length()))); - ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get()))); - ASSERT_NO_THROW(ASSERT_OK(writer.Close())); + std::shared_ptr schema = this->MakeSchema(Repetition::OPTIONAL); + this->WriteFlatColumn(schema, values); - std::shared_ptr out; - this->ReadSingleColumnFile(this->ReaderFromSink(), &out); - ASSERT_TRUE(values->Equals(out)); + this->ReadAndCheckSingleColumnFile(values.get()); } TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { // This also tests max_definition_level = 1 - std::shared_ptr values = NullableArray(SMALL_SIZE, 128, 10); + std::shared_ptr values = NullableArray(SMALL_SIZE, 10); std::shared_ptr
table = MakeSimpleTable(values, true); this->sink_ = std::make_shared(); - ASSERT_NO_THROW(ASSERT_OK( - WriteFlatTable(table.get(), default_memory_pool(), this->sink_, values->length()))); - - std::shared_ptr
out; - this->ReadTableFromFile(this->ReaderFromSink(), &out); - ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(SMALL_SIZE, out->num_rows()); + ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, + values->length(), default_writer_properties())); - std::shared_ptr chunked_array = out->column(0)->data(); - ASSERT_EQ(1, chunked_array->num_chunks()); - ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); + this->ReadAndCheckSingleColumnTable(values); } -TYPED_TEST(TestParquetIO, SingleColumnIntRequiredChunkedWrite) { - std::shared_ptr values = NonNullArray(SMALL_SIZE, 128); - std::shared_ptr values_chunk = - NonNullArray(SMALL_SIZE / 4, 128); +TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { + auto values = NonNullArray(SMALL_SIZE); + int64_t chunk_size = values->length() / 4; - std::shared_ptr schema = - this->MakeSchema(test_traits::parquet_enum, Repetition::REQUIRED); + std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); for (int i = 0; i < 4; i++) { - ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length()))); - ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get()))); + ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); + ASSERT_OK_NO_THROW( + writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size)); } - ASSERT_NO_THROW(ASSERT_OK(writer.Close())); + ASSERT_OK_NO_THROW(writer.Close()); - std::shared_ptr out; - this->ReadSingleColumnFile(this->ReaderFromSink(), &out); - ASSERT_TRUE(values->Equals(out)); + this->ReadAndCheckSingleColumnFile(values.get()); } TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) { - std::shared_ptr values = NonNullArray(LARGE_SIZE, 128); + auto values = NonNullArray(LARGE_SIZE); std::shared_ptr
table = MakeSimpleTable(values, false); this->sink_ = std::make_shared(); - ASSERT_NO_THROW( - ASSERT_OK(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512))); - - std::shared_ptr
out; - this->ReadTableFromFile(this->ReaderFromSink(), &out); - ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(LARGE_SIZE, out->num_rows()); + ASSERT_OK_NO_THROW(WriteFlatTable( + table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties())); - std::shared_ptr chunked_array = out->column(0)->data(); - ASSERT_EQ(1, chunked_array->num_chunks()); - ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); + this->ReadAndCheckSingleColumnTable(values); } TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { - std::shared_ptr values = NullableArray(SMALL_SIZE, 128, 10); - std::shared_ptr values_chunk_nulls = - NullableArray(SMALL_SIZE / 4, 128, 10); - std::shared_ptr values_chunk = - NullableArray(SMALL_SIZE / 4, 128, 0); - - std::shared_ptr schema = - this->MakeSchema(test_traits::parquet_enum, Repetition::OPTIONAL); + int64_t chunk_size = SMALL_SIZE / 4; + auto values = NullableArray(SMALL_SIZE, 10); + + std::shared_ptr schema = this->MakeSchema(Repetition::OPTIONAL); FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); - ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk_nulls->length()))); - ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk_nulls.get()))); - for (int i = 0; i < 3; i++) { - ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length()))); - ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get()))); + for (int i = 0; i < 4; i++) { + ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); + ASSERT_OK_NO_THROW( + writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size)); } - ASSERT_NO_THROW(ASSERT_OK(writer.Close())); + ASSERT_OK_NO_THROW(writer.Close()); - std::shared_ptr out; - this->ReadSingleColumnFile(this->ReaderFromSink(), &out); - ASSERT_TRUE(values->Equals(out)); + this->ReadAndCheckSingleColumnFile(values.get()); } TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { // This also tests max_definition_level = 1 - std::shared_ptr values = NullableArray(LARGE_SIZE, 128, 100); + auto values = NullableArray(LARGE_SIZE, 100); std::shared_ptr
table = MakeSimpleTable(values, true); this->sink_ = std::make_shared(); - ASSERT_NO_THROW( - ASSERT_OK(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512))); + ASSERT_OK_NO_THROW(WriteFlatTable( + table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties())); - std::shared_ptr
out; - this->ReadTableFromFile(this->ReaderFromSink(), &out); - ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(LARGE_SIZE, out->num_rows()); + this->ReadAndCheckSingleColumnTable(values); +} - std::shared_ptr chunked_array = out->column(0)->data(); - ASSERT_EQ(1, chunked_array->num_chunks()); - ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); +using TestUInt32ParquetIO = TestParquetIO; + +TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) { + // This also tests max_definition_level = 1 + std::shared_ptr values = NullableArray(LARGE_SIZE, 100); + std::shared_ptr
table = MakeSimpleTable(values, true); + + // Parquet 2.0 roundtrip should yield an uint32_t column again + this->sink_ = std::make_shared(); + std::shared_ptr<::parquet::WriterProperties> properties = + ::parquet::WriterProperties::Builder() + .version(ParquetVersion::PARQUET_2_0) + ->build(); + ASSERT_OK_NO_THROW( + WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties)); + this->ReadAndCheckSingleColumnTable(values); +} + +TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) { + // This also tests max_definition_level = 1 + std::shared_ptr values = NullableArray(LARGE_SIZE, 100); + std::shared_ptr
table = MakeSimpleTable(values, true); + + // Parquet 1.0 returns an int64_t column as there is no way to tell a Parquet 1.0 + // reader that a column is unsigned. + this->sink_ = std::make_shared(); + std::shared_ptr<::parquet::WriterProperties> properties = + ::parquet::WriterProperties::Builder() + .version(ParquetVersion::PARQUET_1_0) + ->build(); + ASSERT_OK_NO_THROW( + WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties)); + + std::shared_ptr expected_values; + std::shared_ptr int64_data = + std::make_shared(default_memory_pool()); + { + ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length())); + int64_t* int64_data_ptr = reinterpret_cast(int64_data->mutable_data()); + const uint32_t* uint32_data_ptr = + reinterpret_cast(values->data()->data()); + // std::copy might be faster but this is explicit on the casts) + for (int64_t i = 0; i < values->length(); i++) { + int64_data_ptr[i] = static_cast(uint32_data_ptr[i]); + } + } + ASSERT_OK(MakePrimitiveArray(std::make_shared(), values->length(), + int64_data, values->null_count(), values->null_bitmap(), &expected_values)); + this->ReadAndCheckSingleColumnTable(expected_values); +} + +template +using ParquetCDataType = typename ParquetDataType::c_type; + +template +class TestPrimitiveParquetIO : public TestParquetIO { + public: + typedef typename TestType::c_type T; + + void TestFile(std::vector& values, int num_chunks, + std::unique_ptr* file_reader) { + std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); + std::unique_ptr file_writer = this->MakeWriter(schema); + size_t chunk_size = values.size() / num_chunks; + // Convert to Parquet's expected physical type + std::vector values_buffer( + sizeof(ParquetCDataType) * values.size()); + auto values_parquet = + reinterpret_cast*>(values_buffer.data()); + std::copy(values.cbegin(), values.cend(), values_parquet); + for (int i = 0; i < num_chunks; i++) { + auto row_group_writer = file_writer->AppendRowGroup(chunk_size); + auto column_writer = + static_cast*>(row_group_writer->NextColumn()); + ParquetCDataType* data = values_parquet + i * chunk_size; + column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); + column_writer->Close(); + row_group_writer->Close(); + } + file_writer->Close(); + *file_reader = this->ReaderFromSink(); + } + + void TestSingleColumnRequiredTableRead(int num_chunks) { + std::vector values(SMALL_SIZE, test_traits::value); + std::unique_ptr file_reader; + ASSERT_NO_THROW(TestFile(values, num_chunks, &file_reader)); + + std::shared_ptr
out; + this->ReadTableFromFile(std::move(file_reader), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(SMALL_SIZE, out->num_rows()); + + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ExpectArray(values.data(), chunked_array->chunk(0).get()); + } + + void TestSingleColumnRequiredRead(int num_chunks) { + std::vector values(SMALL_SIZE, test_traits::value); + std::unique_ptr file_reader; + ASSERT_NO_THROW(TestFile(values, num_chunks, &file_reader)); + + std::shared_ptr out; + this->ReadSingleColumnFile(std::move(file_reader), &out); + + ExpectArray(values.data(), out.get()); + } +}; + +typedef ::testing::Types PrimitiveTestTypes; + +TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes); + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredRead) { + this->TestSingleColumnRequiredRead(1); +} + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredTableRead) { + this->TestSingleColumnRequiredTableRead(1); +} + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedRead) { + this->TestSingleColumnRequiredRead(4); +} + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) { + this->TestSingleColumnRequiredTableRead(4); } } // namespace parquet diff --git a/cpp/src/arrow/parquet/parquet-schema-test.cc b/cpp/src/arrow/parquet/parquet-schema-test.cc index 8de739491b5..819cdd3ec43 100644 --- a/cpp/src/arrow/parquet/parquet-schema-test.cc +++ b/cpp/src/arrow/parquet/parquet-schema-test.cc @@ -183,7 +183,9 @@ class TestConvertArrowSchema : public ::testing::Test { Status ConvertSchema(const std::vector>& fields) { arrow_schema_ = std::make_shared(fields); - return ToParquetSchema(arrow_schema_.get(), &result_schema_); + std::shared_ptr<::parquet::WriterProperties> properties = + ::parquet::default_writer_properties(); + return ToParquetSchema(arrow_schema_.get(), *properties.get(), &result_schema_); } protected: diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc index 3b4882d4439..7b05665b230 100644 --- a/cpp/src/arrow/parquet/reader.cc +++ b/cpp/src/arrow/parquet/reader.cc @@ -17,6 +17,7 @@ #include "arrow/parquet/reader.h" +#include #include #include #include @@ -27,6 +28,7 @@ #include "arrow/schema.h" #include "arrow/table.h" #include "arrow/types/primitive.h" +#include "arrow/types/string.h" #include "arrow/util/status.h" using parquet::ColumnReader; @@ -36,6 +38,19 @@ using parquet::TypedColumnReader; namespace arrow { namespace parquet { +template +struct ArrowTypeTraits { + typedef NumericBuilder builder_type; +}; + +template <> +struct ArrowTypeTraits { + typedef BooleanBuilder builder_type; +}; + +template +using BuilderType = typename ArrowTypeTraits::builder_type; + class FileReader::Impl { public: Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader); @@ -61,9 +76,45 @@ class FlatColumnReader::Impl { template Status TypedReadBatch(int batch_size, std::shared_ptr* out); + template + Status ReadNullableFlatBatch(const int16_t* def_levels, + typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read, + BuilderType* builder); + template + Status ReadNonNullableBatch(typename ParquetType::c_type* values, int64_t values_read, + BuilderType* builder); + private: void NextRowGroup(); + template + struct can_copy_ptr { + static constexpr bool value = + std::is_same::value || + (std::is_integral{} && std::is_integral{} && + (sizeof(InType) == sizeof(OutType))); + }; + + template ::value>::type* = nullptr> + Status ConvertPhysicalType( + const InType* in_ptr, int64_t length, const OutType** out_ptr) { + *out_ptr = reinterpret_cast(in_ptr); + return Status::OK(); + } + + template ::value>::type* = nullptr> + Status ConvertPhysicalType( + const InType* in_ptr, int64_t length, const OutType** out_ptr) { + RETURN_NOT_OK(values_builder_buffer_.Resize(length * sizeof(OutType))); + OutType* mutable_out_ptr = + reinterpret_cast(values_builder_buffer_.mutable_data()); + std::copy(in_ptr, in_ptr + length, mutable_out_ptr); + *out_ptr = mutable_out_ptr; + return Status::OK(); + } + MemoryPool* pool_; const ::parquet::ColumnDescriptor* descr_; ::parquet::ParquetFileReader* reader_; @@ -155,13 +206,53 @@ FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor NextRowGroup(); } +template +Status FlatColumnReader::Impl::ReadNonNullableBatch(typename ParquetType::c_type* values, + int64_t values_read, BuilderType* builder) { + using ArrowCType = typename ArrowType::c_type; + using ParquetCType = typename ParquetType::c_type; + + DCHECK(builder); + const ArrowCType* values_ptr; + RETURN_NOT_OK( + (ConvertPhysicalType(values, values_read, &values_ptr))); + RETURN_NOT_OK(builder->Append(values_ptr, values_read)); + return Status::OK(); +} + +template +Status FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels, + typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read, + BuilderType* builder) { + using ArrowCType = typename ArrowType::c_type; + + DCHECK(builder); + RETURN_NOT_OK(values_builder_buffer_.Resize(levels_read * sizeof(ArrowCType))); + RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t))); + auto values_ptr = reinterpret_cast(values_builder_buffer_.mutable_data()); + uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data(); + int values_idx = 0; + for (int64_t i = 0; i < levels_read; i++) { + if (def_levels[i] < descr_->max_definition_level()) { + valid_bytes[i] = 0; + } else { + valid_bytes[i] = 1; + values_ptr[i] = values[values_idx++]; + } + } + RETURN_NOT_OK(builder->Append(values_ptr, levels_read, valid_bytes)); + return Status::OK(); +} + template Status FlatColumnReader::Impl::TypedReadBatch( int batch_size, std::shared_ptr* out) { + using ParquetCType = typename ParquetType::c_type; + int values_to_read = batch_size; - NumericBuilder builder(pool_, field_->type); + BuilderType builder(pool_, field_->type); while ((values_to_read > 0) && column_reader_) { - values_buffer_.Resize(values_to_read * sizeof(typename ParquetType::c_type)); + values_buffer_.Resize(values_to_read * sizeof(ParquetCType)); if (descr_->max_definition_level() > 0) { def_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); } @@ -169,31 +260,62 @@ Status FlatColumnReader::Impl::TypedReadBatch( int64_t values_read; int64_t levels_read; int16_t* def_levels = reinterpret_cast(def_levels_buffer_.mutable_data()); - auto values = - reinterpret_cast(values_buffer_.mutable_data()); + auto values = reinterpret_cast(values_buffer_.mutable_data()); PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch( values_to_read, def_levels, nullptr, values, &values_read)); values_to_read -= levels_read; if (descr_->max_definition_level() == 0) { - RETURN_NOT_OK(builder.Append(values, values_read)); + RETURN_NOT_OK( + (ReadNonNullableBatch(values, values_read, &builder))); + } else { + // As per the defintion and checks for flat columns: + // descr_->max_definition_level() == 1 + RETURN_NOT_OK((ReadNullableFlatBatch( + def_levels, values, values_read, levels_read, &builder))); + } + if (!column_reader_->HasNext()) { NextRowGroup(); } + } + *out = builder.Finish(); + return Status::OK(); +} + +template <> +Status FlatColumnReader::Impl::TypedReadBatch( + int batch_size, std::shared_ptr* out) { + int values_to_read = batch_size; + StringBuilder builder(pool_, field_->type); + while ((values_to_read > 0) && column_reader_) { + values_buffer_.Resize(values_to_read * sizeof(::parquet::ByteArray)); + if (descr_->max_definition_level() > 0) { + def_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); + } + auto reader = + dynamic_cast*>(column_reader_.get()); + int64_t values_read; + int64_t levels_read; + int16_t* def_levels = reinterpret_cast(def_levels_buffer_.mutable_data()); + auto values = reinterpret_cast<::parquet::ByteArray*>(values_buffer_.mutable_data()); + PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch( + values_to_read, def_levels, nullptr, values, &values_read)); + values_to_read -= levels_read; + if (descr_->max_definition_level() == 0) { + for (int64_t i = 0; i < levels_read; i++) { + RETURN_NOT_OK( + builder.Append(reinterpret_cast(values[i].ptr), values[i].len)); + } } else { // descr_->max_definition_level() == 1 - RETURN_NOT_OK(values_builder_buffer_.Resize( - levels_read * sizeof(typename ParquetType::c_type))); - RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t))); - auto values_ptr = reinterpret_cast( - values_builder_buffer_.mutable_data()); - uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data(); int values_idx = 0; for (int64_t i = 0; i < levels_read; i++) { if (def_levels[i] < descr_->max_definition_level()) { - valid_bytes[i] = 0; + RETURN_NOT_OK(builder.AppendNull()); } else { - valid_bytes[i] = 1; - values_ptr[i] = values[values_idx++]; + RETURN_NOT_OK( + builder.Append(reinterpret_cast(values[values_idx].ptr), + values[values_idx].len)); + values_idx++; } } - builder.Append(values_ptr, levels_read, valid_bytes); } if (!column_reader_->HasNext()) { NextRowGroup(); } } @@ -214,10 +336,18 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* } switch (field_->type->type) { + TYPED_BATCH_CASE(BOOL, BooleanType, ::parquet::BooleanType) + TYPED_BATCH_CASE(UINT8, UInt8Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(INT8, Int8Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(UINT16, UInt16Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(INT16, Int16Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(UINT32, UInt32Type, ::parquet::Int32Type) TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(UINT64, UInt64Type, ::parquet::Int64Type) TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type) TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType) TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType) + TYPED_BATCH_CASE(STRING, StringType, ::parquet::ByteArrayType) default: return Status::NotImplemented(field_->type->ToString()); } diff --git a/cpp/src/arrow/parquet/schema.cc b/cpp/src/arrow/parquet/schema.cc index c7979db3494..a79342afe2f 100644 --- a/cpp/src/arrow/parquet/schema.cc +++ b/cpp/src/arrow/parquet/schema.cc @@ -42,7 +42,12 @@ namespace parquet { const auto BOOL = std::make_shared(); const auto UINT8 = std::make_shared(); +const auto INT8 = std::make_shared(); +const auto UINT16 = std::make_shared(); +const auto INT16 = std::make_shared(); +const auto UINT32 = std::make_shared(); const auto INT32 = std::make_shared(); +const auto UINT64 = std::make_shared(); const auto INT64 = std::make_shared(); const auto FLOAT = std::make_shared(); const auto DOUBLE = std::make_shared(); @@ -92,6 +97,21 @@ static Status FromInt32(const PrimitiveNode* node, TypePtr* out) { case LogicalType::NONE: *out = INT32; break; + case LogicalType::UINT_8: + *out = UINT8; + break; + case LogicalType::INT_8: + *out = INT8; + break; + case LogicalType::UINT_16: + *out = UINT16; + break; + case LogicalType::INT_16: + *out = INT16; + break; + case LogicalType::UINT_32: + *out = UINT32; + break; case LogicalType::DECIMAL: *out = MakeDecimalType(node); break; @@ -107,6 +127,9 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) { case LogicalType::NONE: *out = INT64; break; + case LogicalType::UINT_64: + *out = UINT64; + break; case LogicalType::DECIMAL: *out = MakeDecimalType(node); break; @@ -187,20 +210,21 @@ Status FromParquetSchema( } Status StructToNode(const std::shared_ptr& type, const std::string& name, - bool nullable, NodePtr* out) { + bool nullable, const ::parquet::WriterProperties& properties, NodePtr* out) { Repetition::type repetition = Repetition::REQUIRED; if (nullable) { repetition = Repetition::OPTIONAL; } std::vector children(type->num_children()); for (int i = 0; i < type->num_children(); i++) { - RETURN_NOT_OK(FieldToNode(type->child(i), &children[i])); + RETURN_NOT_OK(FieldToNode(type->child(i), properties, &children[i])); } *out = GroupNode::Make(name, repetition, children); return Status::OK(); } -Status FieldToNode(const std::shared_ptr& field, NodePtr* out) { +Status FieldToNode(const std::shared_ptr& field, + const ::parquet::WriterProperties& properties, NodePtr* out) { LogicalType::type logical_type = LogicalType::NONE; ParquetType::type type; Repetition::type repetition = Repetition::REQUIRED; @@ -231,8 +255,12 @@ Status FieldToNode(const std::shared_ptr& field, NodePtr* out) { logical_type = LogicalType::INT_16; break; case Type::UINT32: - type = ParquetType::INT32; - logical_type = LogicalType::UINT_32; + if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) { + type = ParquetType::INT64; + } else { + type = ParquetType::INT32; + logical_type = LogicalType::UINT_32; + } break; case Type::INT32: type = ParquetType::INT32; @@ -277,7 +305,7 @@ Status FieldToNode(const std::shared_ptr& field, NodePtr* out) { break; case Type::STRUCT: { auto struct_type = std::static_pointer_cast(field->type); - return StructToNode(struct_type, field->name, field->nullable, out); + return StructToNode(struct_type, field->name, field->nullable, properties, out); } break; default: // TODO: LIST, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR @@ -287,11 +315,12 @@ Status FieldToNode(const std::shared_ptr& field, NodePtr* out) { return Status::OK(); } -Status ToParquetSchema( - const Schema* arrow_schema, std::shared_ptr<::parquet::SchemaDescriptor>* out) { +Status ToParquetSchema(const Schema* arrow_schema, + const ::parquet::WriterProperties& properties, + std::shared_ptr<::parquet::SchemaDescriptor>* out) { std::vector nodes(arrow_schema->num_fields()); for (int i = 0; i < arrow_schema->num_fields(); i++) { - RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), &nodes[i])); + RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), properties, &nodes[i])); } NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes); diff --git a/cpp/src/arrow/parquet/schema.h b/cpp/src/arrow/parquet/schema.h index ec5f96062e8..39bee059522 100644 --- a/cpp/src/arrow/parquet/schema.h +++ b/cpp/src/arrow/parquet/schema.h @@ -21,6 +21,7 @@ #include #include "parquet/api/schema.h" +#include "parquet/api/writer.h" #include "arrow/schema.h" #include "arrow/type.h" @@ -36,10 +37,12 @@ Status NodeToField(const ::parquet::schema::NodePtr& node, std::shared_ptr* out); -Status FieldToNode(const std::shared_ptr& field, ::parquet::schema::NodePtr* out); +Status FieldToNode(const std::shared_ptr& field, + const ::parquet::WriterProperties& properties, ::parquet::schema::NodePtr* out); -Status ToParquetSchema( - const Schema* arrow_schema, std::shared_ptr<::parquet::SchemaDescriptor>* out); +Status ToParquetSchema(const Schema* arrow_schema, + const ::parquet::WriterProperties& properties, + std::shared_ptr<::parquet::SchemaDescriptor>* out); } // namespace parquet diff --git a/cpp/src/arrow/parquet/test-util.h b/cpp/src/arrow/parquet/test-util.h index cc8723bf6ec..68a7fb94c2a 100644 --- a/cpp/src/arrow/parquet/test-util.h +++ b/cpp/src/arrow/parquet/test-util.h @@ -18,26 +18,90 @@ #include #include +#include "arrow/test-util.h" #include "arrow/types/primitive.h" +#include "arrow/types/string.h" namespace arrow { namespace parquet { template -std::shared_ptr NonNullArray( - size_t size, typename ArrowType::c_type value) { - std::vector values(size, value); +using is_arrow_float = std::is_floating_point; + +template +using is_arrow_int = std::is_integral; + +template +using is_arrow_string = std::is_same; + +template +typename std::enable_if::value, + std::shared_ptr>::type +NonNullArray(size_t size) { + std::vector values; + ::arrow::test::random_real(size, 0, 0, 1, &values); NumericBuilder builder(default_memory_pool(), std::make_shared()); builder.Append(values.data(), values.size()); return std::static_pointer_cast(builder.Finish()); } -// This helper function only supports (size/2) nulls yet. +template +typename std::enable_if::value, + std::shared_ptr>::type +NonNullArray(size_t size) { + std::vector values; + ::arrow::test::randint(size, 0, 64, &values); + NumericBuilder builder(default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size()); + return std::static_pointer_cast(builder.Finish()); +} + +template +typename std::enable_if::value, + std::shared_ptr>::type +NonNullArray(size_t size) { + StringBuilder builder(default_memory_pool(), std::make_shared()); + for (size_t i = 0; i < size; i++) { + builder.Append("test-string"); + } + return std::static_pointer_cast(builder.Finish()); +} + +template <> +std::shared_ptr NonNullArray(size_t size) { + std::vector values; + ::arrow::test::randint(size, 0, 1, &values); + BooleanBuilder builder(default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size()); + return std::static_pointer_cast(builder.Finish()); +} + +// This helper function only supports (size/2) nulls. +template +typename std::enable_if::value, + std::shared_ptr>::type +NullableArray(size_t size, size_t num_nulls) { + std::vector values; + ::arrow::test::random_real(size, 0, 0, 1, &values); + std::vector valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + NumericBuilder builder(default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size(), valid_bytes.data()); + return std::static_pointer_cast(builder.Finish()); +} + +// This helper function only supports (size/2) nulls. template -std::shared_ptr NullableArray( - size_t size, typename ArrowType::c_type value, size_t num_nulls) { - std::vector values(size, value); +typename std::enable_if::value, + std::shared_ptr>::type +NullableArray(size_t size, size_t num_nulls) { + std::vector values; + ::arrow::test::randint(size, 0, 64, &values); std::vector valid_bytes(size, 1); for (size_t i = 0; i < num_nulls; i++) { @@ -49,14 +113,49 @@ std::shared_ptr NullableArray( return std::static_pointer_cast(builder.Finish()); } -std::shared_ptr MakeColumn(const std::string& name, - const std::shared_ptr& array, bool nullable) { +// This helper function only supports (size/2) nulls yet. +template +typename std::enable_if::value, + std::shared_ptr>::type +NullableArray(size_t size, size_t num_nulls) { + std::vector valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + StringBuilder builder(default_memory_pool(), std::make_shared()); + for (size_t i = 0; i < size; i++) { + builder.Append("test-string"); + } + return std::static_pointer_cast(builder.Finish()); +} + +// This helper function only supports (size/2) nulls yet. +template <> +std::shared_ptr NullableArray( + size_t size, size_t num_nulls) { + std::vector values; + ::arrow::test::randint(size, 0, 1, &values); + std::vector valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + BooleanBuilder builder(default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size(), valid_bytes.data()); + return std::static_pointer_cast(builder.Finish()); +} + +std::shared_ptr MakeColumn( + const std::string& name, const std::shared_ptr& array, bool nullable) { auto field = std::make_shared(name, array->type(), nullable); return std::make_shared(field, array); } std::shared_ptr
MakeSimpleTable( - const std::shared_ptr& values, bool nullable) { + const std::shared_ptr& values, bool nullable) { std::shared_ptr column = MakeColumn("col", values, nullable); std::vector> columns({column}); std::vector> fields({column->field()}); @@ -72,6 +171,23 @@ void ExpectArray(T* expected, Array* result) { } } +template +void ExpectArray(typename ArrowType::c_type* expected, Array* result) { + PrimitiveArray* p_array = static_cast(result); + for (int64_t i = 0; i < result->length(); i++) { + EXPECT_EQ(expected[i], + reinterpret_cast(p_array->data()->data())[i]); + } +} + +template <> +void ExpectArray(uint8_t* expected, Array* result) { + BooleanBuilder builder(default_memory_pool(), std::make_shared()); + builder.Append(expected, result->length()); + std::shared_ptr expected_array = builder.Finish(); + EXPECT_TRUE(result->Equals(expected_array)); +} + } // namespace parquet } // namespace arrow diff --git a/cpp/src/arrow/parquet/writer.cc b/cpp/src/arrow/parquet/writer.cc index 4005e3b2b0c..63449bb20b1 100644 --- a/cpp/src/arrow/parquet/writer.cc +++ b/cpp/src/arrow/parquet/writer.cc @@ -25,11 +25,13 @@ #include "arrow/table.h" #include "arrow/types/construct.h" #include "arrow/types/primitive.h" +#include "arrow/types/string.h" #include "arrow/parquet/schema.h" #include "arrow/parquet/utils.h" #include "arrow/util/status.h" using parquet::ParquetFileWriter; +using parquet::ParquetVersion; using parquet::schema::GroupNode; namespace arrow { @@ -41,10 +43,40 @@ class FileWriter::Impl { Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer); Status NewRowGroup(int64_t chunk_size); - template + template Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data, int64_t offset, int64_t length); + + // TODO(uwe): Same code as in reader.cc the only difference is the name of the temporary + // buffer + template + struct can_copy_ptr { + static constexpr bool value = + std::is_same::value || + (std::is_integral{} && std::is_integral{} && + (sizeof(InType) == sizeof(OutType))); + }; + + template ::value>::type* = nullptr> + Status ConvertPhysicalType(const InType* in_ptr, int64_t, const OutType** out_ptr) { + *out_ptr = reinterpret_cast(in_ptr); + return Status::OK(); + } + + template ::value>::type* = nullptr> + Status ConvertPhysicalType( + const InType* in_ptr, int64_t length, const OutType** out_ptr) { + RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(OutType))); + OutType* mutable_out_ptr = reinterpret_cast(data_buffer_.mutable_data()); + std::copy(in_ptr, in_ptr + length, mutable_out_ptr); + *out_ptr = mutable_out_ptr; + return Status::OK(); + } + Status WriteFlatColumnChunk(const PrimitiveArray* data, int64_t offset, int64_t length); + Status WriteFlatColumnChunk(const StringArray* data, int64_t offset, int64_t length); Status Close(); virtual ~Impl() {} @@ -53,6 +85,8 @@ class FileWriter::Impl { friend class FileWriter; MemoryPool* pool_; + // Buffer used for storing the data of an array converted to the physical type + // as expected by parquet-cpp. PoolBuffer data_buffer_; PoolBuffer def_levels_buffer_; std::unique_ptr<::parquet::ParquetFileWriter> writer_; @@ -72,36 +106,95 @@ Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) { return Status::OK(); } -template +template Status FileWriter::Impl::TypedWriteBatch(::parquet::ColumnWriter* column_writer, const PrimitiveArray* data, int64_t offset, int64_t length) { - // TODO: DCHECK((offset + length) <= data->length()); - auto data_ptr = - reinterpret_cast(data->data()->data()) + - offset; + using ArrowCType = typename ArrowType::c_type; + using ParquetCType = typename ParquetType::c_type; + + DCHECK((offset + length) <= data->length()); + auto data_ptr = reinterpret_cast(data->data()->data()) + offset; auto writer = reinterpret_cast<::parquet::TypedColumnWriter*>(column_writer); if (writer->descr()->max_definition_level() == 0) { // no nulls, just dump the data - PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, data_ptr)); + const ParquetCType* data_writer_ptr; + RETURN_NOT_OK((ConvertPhysicalType( + data_ptr, length, &data_writer_ptr))); + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, data_writer_ptr)); } else if (writer->descr()->max_definition_level() == 1) { RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t))); int16_t* def_levels_ptr = reinterpret_cast(def_levels_buffer_.mutable_data()); if (data->null_count() == 0) { std::fill(def_levels_ptr, def_levels_ptr + length, 1); - PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, data_ptr)); + const ParquetCType* data_writer_ptr; + RETURN_NOT_OK((ConvertPhysicalType( + data_ptr, length, &data_writer_ptr))); + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(length, def_levels_ptr, nullptr, data_writer_ptr)); } else { - RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(typename ParquetType::c_type))); - auto buffer_ptr = - reinterpret_cast(data_buffer_.mutable_data()); + RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ParquetCType))); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); int buffer_idx = 0; for (int i = 0; i < length; i++) { if (data->IsNull(offset + i)) { def_levels_ptr[i] = 0; } else { def_levels_ptr[i] = 1; - buffer_ptr[buffer_idx++] = data_ptr[i]; + buffer_ptr[buffer_idx++] = static_cast(data_ptr[i]); + } + } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); + } + } else { + return Status::NotImplemented("no support for max definition level > 1 yet"); + } + PARQUET_CATCH_NOT_OK(writer->Close()); + return Status::OK(); +} + +// This specialization seems quite similar but it significantly differs in two points: +// * offset is added at the most latest time to the pointer as we have sub-byte access +// * Arrow data is stored bitwise thus we cannot use std::copy to transform from +// ArrowType::c_type to ParquetType::c_type +template <> +Status FileWriter::Impl::TypedWriteBatch<::parquet::BooleanType, BooleanType>( + ::parquet::ColumnWriter* column_writer, const PrimitiveArray* data, int64_t offset, + int64_t length) { + DCHECK((offset + length) <= data->length()); + RETURN_NOT_OK(data_buffer_.Resize(length)); + auto data_ptr = reinterpret_cast(data->data()->data()); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); + auto writer = reinterpret_cast<::parquet::TypedColumnWriter<::parquet::BooleanType>*>( + column_writer); + if (writer->descr()->max_definition_level() == 0) { + // no nulls, just dump the data + for (int64_t i = 0; i < length; i++) { + buffer_ptr[i] = util::get_bit(data_ptr, offset + i); + } + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, buffer_ptr)); + } else if (writer->descr()->max_definition_level() == 1) { + RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t))); + int16_t* def_levels_ptr = + reinterpret_cast(def_levels_buffer_.mutable_data()); + if (data->null_count() == 0) { + std::fill(def_levels_ptr, def_levels_ptr + length, 1); + for (int64_t i = 0; i < length; i++) { + buffer_ptr[i] = util::get_bit(data_ptr, offset + i); + } + // TODO(PARQUET-644): write boolean values as a packed bitmap + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); + } else { + int buffer_idx = 0; + for (int i = 0; i < length; i++) { + if (data->IsNull(offset + i)) { + def_levels_ptr[i] = 0; + } else { + def_levels_ptr[i] = 1; + buffer_ptr[buffer_idx++] = util::get_bit(data_ptr, offset + i); } } PARQUET_CATCH_NOT_OK( @@ -120,9 +213,9 @@ Status FileWriter::Impl::Close() { return Status::OK(); } -#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \ - case Type::ENUM: \ - return TypedWriteBatch(writer, data, offset, length); \ +#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \ + case Type::ENUM: \ + return TypedWriteBatch(writer, data, offset, length); \ break; Status FileWriter::Impl::WriteFlatColumnChunk( @@ -130,15 +223,76 @@ Status FileWriter::Impl::WriteFlatColumnChunk( ::parquet::ColumnWriter* writer; PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn()); switch (data->type_enum()) { - TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type) - TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type) - TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType) - TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType) + TYPED_BATCH_CASE(BOOL, BooleanType, ::parquet::BooleanType) + TYPED_BATCH_CASE(UINT8, UInt8Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(INT8, Int8Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(UINT16, UInt16Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(INT16, Int16Type, ::parquet::Int32Type) + case Type::UINT32: + if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) { + // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need + // to use the larger Int64Type to store them lossless. + return TypedWriteBatch<::parquet::Int64Type, UInt32Type>( + writer, data, offset, length); + } else { + return TypedWriteBatch<::parquet::Int32Type, UInt32Type>( + writer, data, offset, length); + } + TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(UINT64, UInt64Type, ::parquet::Int64Type) + TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type) + TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType) + TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType) default: return Status::NotImplemented(data->type()->ToString()); } } +Status FileWriter::Impl::WriteFlatColumnChunk( + const StringArray* data, int64_t offset, int64_t length) { + ::parquet::ColumnWriter* column_writer; + PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn()); + DCHECK((offset + length) <= data->length()); + RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(::parquet::ByteArray))); + auto buffer_ptr = reinterpret_cast<::parquet::ByteArray*>(data_buffer_.mutable_data()); + auto values = std::dynamic_pointer_cast(data->values()); + auto data_ptr = reinterpret_cast(values->data()->data()); + DCHECK(values != nullptr); + auto writer = reinterpret_cast<::parquet::TypedColumnWriter<::parquet::ByteArrayType>*>( + column_writer); + if (writer->descr()->max_definition_level() > 0) { + RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t))); + } + int16_t* def_levels_ptr = reinterpret_cast(def_levels_buffer_.mutable_data()); + if (writer->descr()->max_definition_level() == 0 || data->null_count() == 0) { + // no nulls, just dump the data + for (int64_t i = 0; i < length; i++) { + buffer_ptr[i] = ::parquet::ByteArray( + data->value_length(i + offset), data_ptr + data->value_offset(i)); + } + if (writer->descr()->max_definition_level() > 0) { + std::fill(def_levels_ptr, def_levels_ptr + length, 1); + } + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); + } else if (writer->descr()->max_definition_level() == 1) { + int buffer_idx = 0; + for (int64_t i = 0; i < length; i++) { + if (data->IsNull(offset + i)) { + def_levels_ptr[i] = 0; + } else { + def_levels_ptr[i] = 1; + buffer_ptr[buffer_idx++] = ::parquet::ByteArray( + data->value_length(i + offset), data_ptr + data->value_offset(i + offset)); + } + } + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); + } else { + return Status::NotImplemented("no support for max definition level > 1 yet"); + } + PARQUET_CATCH_NOT_OK(writer->Close()); + return Status::OK(); +} + FileWriter::FileWriter( MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer) : impl_(new FileWriter::Impl(pool, std::move(writer))) {} @@ -148,10 +302,20 @@ Status FileWriter::NewRowGroup(int64_t chunk_size) { } Status FileWriter::WriteFlatColumnChunk( - const PrimitiveArray* data, int64_t offset, int64_t length) { + const Array* array, int64_t offset, int64_t length) { int64_t real_length = length; - if (length == -1) { real_length = data->length(); } - return impl_->WriteFlatColumnChunk(data, offset, real_length); + if (length == -1) { real_length = array->length(); } + if (array->type_enum() == Type::STRING) { + auto string_array = dynamic_cast(array); + DCHECK(string_array); + return impl_->WriteFlatColumnChunk(string_array, offset, real_length); + } else { + auto primitive_array = dynamic_cast(array); + if (!primitive_array) { + return Status::NotImplemented("Table must consist of PrimitiveArray instances"); + } + return impl_->WriteFlatColumnChunk(primitive_array, offset, real_length); + } } Status FileWriter::Close() { @@ -165,40 +329,30 @@ MemoryPool* FileWriter::memory_pool() const { FileWriter::~FileWriter() {} Status WriteFlatTable(const Table* table, MemoryPool* pool, - std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size) { + const std::shared_ptr<::parquet::OutputStream>& sink, int64_t chunk_size, + const std::shared_ptr<::parquet::WriterProperties>& properties) { std::shared_ptr<::parquet::SchemaDescriptor> parquet_schema; - RETURN_NOT_OK(ToParquetSchema(table->schema().get(), &parquet_schema)); + RETURN_NOT_OK( + ToParquetSchema(table->schema().get(), *properties.get(), &parquet_schema)); auto schema_node = std::static_pointer_cast(parquet_schema->schema()); std::unique_ptr parquet_writer = - ParquetFileWriter::Open(sink, schema_node); + ParquetFileWriter::Open(sink, schema_node, properties); FileWriter writer(pool, std::move(parquet_writer)); - // TODO: Support writing chunked arrays. + // TODO(ARROW-232) Support writing chunked arrays. for (int i = 0; i < table->num_columns(); i++) { if (table->column(i)->data()->num_chunks() != 1) { return Status::NotImplemented("No support for writing chunked arrays yet."); } } - // Cast to PrimitiveArray instances as we work with them. - std::vector> arrays(table->num_columns()); - for (int i = 0; i < table->num_columns(); i++) { - // num_chunks == 1 as per above loop - std::shared_ptr array = table->column(i)->data()->chunk(0); - auto primitive_array = std::dynamic_pointer_cast(array); - if (!primitive_array) { - PARQUET_IGNORE_NOT_OK(writer.Close()); - return Status::NotImplemented("Table must consist of PrimitiveArray instances"); - } - arrays[i] = primitive_array; - } - for (int chunk = 0; chunk * chunk_size < table->num_rows(); chunk++) { int64_t offset = chunk * chunk_size; int64_t size = std::min(chunk_size, table->num_rows() - offset); RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close())); for (int i = 0; i < table->num_columns(); i++) { - RETURN_NOT_OK_ELSE(writer.WriteFlatColumnChunk(arrays[i].get(), offset, size), + std::shared_ptr array = table->column(i)->data()->chunk(0); + RETURN_NOT_OK_ELSE(writer.WriteFlatColumnChunk(array.get(), offset, size), PARQUET_IGNORE_NOT_OK(writer.Close())); } } diff --git a/cpp/src/arrow/parquet/writer.h b/cpp/src/arrow/parquet/writer.h index 93693f51184..cfd80d80b79 100644 --- a/cpp/src/arrow/parquet/writer.h +++ b/cpp/src/arrow/parquet/writer.h @@ -25,10 +25,12 @@ namespace arrow { +class Array; class MemoryPool; class PrimitiveArray; class RowBatch; class Status; +class StringArray; class Table; namespace parquet { @@ -43,8 +45,7 @@ class FileWriter { FileWriter(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer); Status NewRowGroup(int64_t chunk_size); - Status WriteFlatColumnChunk( - const PrimitiveArray* data, int64_t offset = 0, int64_t length = -1); + Status WriteFlatColumnChunk(const Array* data, int64_t offset = 0, int64_t length = -1); Status Close(); virtual ~FileWriter(); @@ -62,7 +63,9 @@ class FileWriter { * The table shall only consist of nullable, non-repeated columns of primitive type. */ Status WriteFlatTable(const Table* table, MemoryPool* pool, - std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size); + const std::shared_ptr<::parquet::OutputStream>& sink, int64_t chunk_size, + const std::shared_ptr<::parquet::WriterProperties>& properties = + ::parquet::default_writer_properties()); } // namespace parquet diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h index 2f81161d1d6..055dac74444 100644 --- a/cpp/src/arrow/test-util.h +++ b/cpp/src/arrow/test-util.h @@ -50,6 +50,8 @@ if (!s.ok()) { FAIL() << s.ToString(); } \ } while (0) +#define ASSERT_OK_NO_THROW(expr) ASSERT_NO_THROW(ASSERT_OK(expr)) + #define EXPECT_OK(expr) \ do { \ Status s = (expr); \ diff --git a/cpp/src/arrow/types/primitive.cc b/cpp/src/arrow/types/primitive.cc index 08fc8478e6d..f4b47f9d2f5 100644 --- a/cpp/src/arrow/types/primitive.cc +++ b/cpp/src/arrow/types/primitive.cc @@ -133,6 +133,11 @@ Status PrimitiveBuilder::Append( RETURN_NOT_OK(Reserve(length)); for (int i = 0; i < length; ++i) { + // Skip reading from unitialised memory + // TODO: This actually is only to keep valgrind happy but may or may not + // have a performance impact. + if ((valid_bytes != nullptr) && !valid_bytes[i]) continue; + if (values[i] > 0) { util::set_bit(raw_data_, length_ + i); } else { diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd index 0918344070e..a2f83ea5ea5 100644 --- a/python/pyarrow/includes/parquet.pxd +++ b/python/pyarrow/includes/parquet.pxd @@ -32,6 +32,10 @@ cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil: pass cdef extern from "parquet/api/schema.h" namespace "parquet" nogil: + enum ParquetVersion" parquet::ParquetVersion::type": + PARQUET_1_0" parquet::ParquetVersion::PARQUET_1_0" + PARQUET_2_0" parquet::ParquetVersion::PARQUET_2_0" + cdef cppclass SchemaDescriptor: shared_ptr[Node] schema() GroupNode* group() @@ -80,6 +84,11 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: LocalFileOutputStream(const c_string& path) void Close() + cdef cppclass WriterProperties: + cppclass Builder: + Builder* version(ParquetVersion version) + shared_ptr[WriterProperties] build() + cdef extern from "arrow/parquet/reader.h" namespace "arrow::parquet" nogil: cdef cppclass FileReader: @@ -93,5 +102,7 @@ cdef extern from "arrow/parquet/schema.h" namespace "arrow::parquet" nogil: cdef extern from "arrow/parquet/writer.h" namespace "arrow::parquet" nogil: - cdef CStatus WriteFlatTable(const CTable* table, MemoryPool* pool, shared_ptr[OutputStream] sink, int64_t chunk_size) + cdef CStatus WriteFlatTable(const CTable* table, MemoryPool* pool, + const shared_ptr[OutputStream]& sink, int64_t chunk_size, + const shared_ptr[WriterProperties]& properties) diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx index 3d5355ebe43..0b2b2088033 100644 --- a/python/pyarrow/parquet.pyx +++ b/python/pyarrow/parquet.pyx @@ -24,6 +24,7 @@ cimport pyarrow.includes.pyarrow as pyarrow from pyarrow.includes.parquet cimport * from pyarrow.compat import tobytes +from pyarrow.error import ArrowException from pyarrow.error cimport check_cstatus from pyarrow.table cimport Table @@ -42,11 +43,13 @@ def read_table(filename, columns=None): # in Cython (due to missing rvalue support) reader = unique_ptr[FileReader](new FileReader(default_memory_pool(), ParquetFileReader.OpenFile(tobytes(filename)))) - check_cstatus(reader.get().ReadFlatTable(&ctable)) + with nogil: + check_cstatus(reader.get().ReadFlatTable(&ctable)) + table.init(ctable) return table -def write_table(table, filename, chunk_size=None): +def write_table(table, filename, chunk_size=None, version=None): """ Write a Table to Parquet format @@ -56,16 +59,29 @@ def write_table(table, filename, chunk_size=None): filename : string chunk_size : int The maximum number of rows in each Parquet RowGroup + version : {"1.0", "2.0"}, default "1.0" + The Parquet format version, defaults to 1.0 """ cdef Table table_ = table cdef CTable* ctable_ = table_.table cdef shared_ptr[OutputStream] sink + cdef WriterProperties.Builder properties_builder cdef int64_t chunk_size_ = 0 if chunk_size is None: chunk_size_ = min(ctable_.num_rows(), int(2**16)) else: chunk_size_ = chunk_size + if version is not None: + if version == "1.0": + properties_builder.version(PARQUET_1_0) + elif version == "2.0": + properties_builder.version(PARQUET_2_0) + else: + raise ArrowException("Unsupported Parquet format version") + sink.reset(new LocalFileOutputStream(tobytes(filename))) - check_cstatus(WriteFlatTable(ctable_, default_memory_pool(), sink, chunk_size_)) + with nogil: + check_cstatus(WriteFlatTable(ctable_, default_memory_pool(), sink, + chunk_size_, properties_builder.build())) diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index d92cf4ca656..de9cfbb46e1 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -42,18 +42,55 @@ def test_single_pylist_column_roundtrip(tmpdir): data_read = col_read.data.chunk(0) assert data_written.equals(data_read) -def test_pandas_rountrip(tmpdir): +def test_pandas_parquet_2_0_rountrip(tmpdir): size = 10000 + np.random.seed(0) df = pd.DataFrame({ + 'uint8': np.arange(size, dtype=np.uint8), + 'uint16': np.arange(size, dtype=np.uint16), + 'uint32': np.arange(size, dtype=np.uint32), + 'uint64': np.arange(size, dtype=np.uint64), + 'int8': np.arange(size, dtype=np.int16), + 'int16': np.arange(size, dtype=np.int16), 'int32': np.arange(size, dtype=np.int32), 'int64': np.arange(size, dtype=np.int64), 'float32': np.arange(size, dtype=np.float32), - 'float64': np.arange(size, dtype=np.float64) + 'float64': np.arange(size, dtype=np.float64), + 'bool': np.random.randn(size) > 0, + 'str': [str(x) for x in range(size)], + 'str_with_nulls': [None] + [str(x) for x in range(size - 2)] + [None] }) filename = tmpdir.join('pandas_rountrip.parquet') arrow_table = A.from_pandas_dataframe(df) - A.parquet.write_table(arrow_table, filename.strpath) + A.parquet.write_table(arrow_table, filename.strpath, version="2.0") table_read = pyarrow.parquet.read_table(filename.strpath) df_read = table_read.to_pandas() pdt.assert_frame_equal(df, df_read) +def test_pandas_parquet_1_0_rountrip(tmpdir): + size = 10000 + np.random.seed(0) + df = pd.DataFrame({ + 'uint8': np.arange(size, dtype=np.uint8), + 'uint16': np.arange(size, dtype=np.uint16), + 'uint32': np.arange(size, dtype=np.uint32), + 'uint64': np.arange(size, dtype=np.uint64), + 'int8': np.arange(size, dtype=np.int16), + 'int16': np.arange(size, dtype=np.int16), + 'int32': np.arange(size, dtype=np.int32), + 'int64': np.arange(size, dtype=np.int64), + 'float32': np.arange(size, dtype=np.float32), + 'float64': np.arange(size, dtype=np.float64), + 'bool': np.random.randn(size) > 0 + }) + filename = tmpdir.join('pandas_rountrip.parquet') + arrow_table = A.from_pandas_dataframe(df) + A.parquet.write_table(arrow_table, filename.strpath, version="1.0") + table_read = pyarrow.parquet.read_table(filename.strpath) + df_read = table_read.to_pandas() + + # We pass uint32_t as int64_t if we write Parquet version 1.0 + df['uint32'] = df['uint32'].values.astype(np.int64) + + pdt.assert_frame_equal(df, df_read) +