From 8e8d7d71e58d9ac9711687f090f56910d16acfeb Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Mon, 16 May 2016 20:18:41 +0200 Subject: [PATCH 01/10] ARROW-201: [C++] Initial ParquetWriter implementation --- cpp/src/arrow/parquet/CMakeLists.txt | 5 +- cpp/src/arrow/parquet/parquet-io-test.cc | 225 +++++++++++++++++++ cpp/src/arrow/parquet/parquet-reader-test.cc | 116 ---------- cpp/src/arrow/parquet/reader.cc | 33 ++- cpp/src/arrow/parquet/writer.cc | 143 ++++++++++++ cpp/src/arrow/parquet/writer.h | 59 +++++ 6 files changed, 457 insertions(+), 124 deletions(-) create mode 100644 cpp/src/arrow/parquet/parquet-io-test.cc delete mode 100644 cpp/src/arrow/parquet/parquet-reader-test.cc create mode 100644 cpp/src/arrow/parquet/writer.cc create mode 100644 cpp/src/arrow/parquet/writer.h diff --git a/cpp/src/arrow/parquet/CMakeLists.txt b/cpp/src/arrow/parquet/CMakeLists.txt index cd6f05d6b5f..7254c485d33 100644 --- a/cpp/src/arrow/parquet/CMakeLists.txt +++ b/cpp/src/arrow/parquet/CMakeLists.txt @@ -21,6 +21,7 @@ set(PARQUET_SRCS reader.cc schema.cc + writer.cc ) set(PARQUET_LIBS @@ -37,8 +38,8 @@ SET_TARGET_PROPERTIES(arrow_parquet PROPERTIES LINKER_LANGUAGE CXX) ADD_ARROW_TEST(parquet-schema-test) ARROW_TEST_LINK_LIBRARIES(parquet-schema-test arrow_parquet) -ADD_ARROW_TEST(parquet-reader-test) -ARROW_TEST_LINK_LIBRARIES(parquet-reader-test arrow_parquet) +ADD_ARROW_TEST(parquet-io-test) +ARROW_TEST_LINK_LIBRARIES(parquet-io-test arrow_parquet) # Headers: top level install(FILES diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc new file mode 100644 index 00000000000..17f90bcaf03 --- /dev/null +++ b/cpp/src/arrow/parquet/parquet-io-test.cc @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gtest/gtest.h" + +#include "arrow/test-util.h" +#include "arrow/parquet/reader.h" +#include "arrow/parquet/writer.h" +#include "arrow/types/primitive.h" +#include "arrow/util/memory-pool.h" +#include "arrow/util/status.h" + +#include "parquet/api/reader.h" +#include "parquet/api/writer.h" + +using ParquetBuffer = parquet::Buffer; +using parquet::BufferReader; +using parquet::InMemoryOutputStream; +using parquet::ParquetFileReader; +using parquet::ParquetFileWriter; +using parquet::RandomAccessSource; +using parquet::Repetition; +using parquet::SchemaDescriptor; +using ParquetType = parquet::Type; +using parquet::schema::GroupNode; +using parquet::schema::NodePtr; +using parquet::schema::PrimitiveNode; + +namespace arrow { + +namespace parquet { + +class TestParquetIO : public ::testing::Test { + public: + virtual void SetUp() {} + + std::shared_ptr Int64Schema() { + auto pnode = PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64); + NodePtr node_ = + GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); + return std::static_pointer_cast(node_); + } + + std::shared_ptr DoubleSchema() { + auto pnode = PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE); + NodePtr node_ = + GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); + return std::static_pointer_cast(node_); + } + + std::shared_ptr DoubleValueArray( + size_t size, double value, size_t num_nulls) { + std::vector values(size, value); + std::vector valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + DoubleBuilder builder(default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size(), valid_bytes.data()); + return std::static_pointer_cast(builder.Finish()); + } + + std::shared_ptr Int64ValueArray(size_t size, int64_t value) { + std::vector values(size, value); + Int64Builder builder(default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size()); + return std::static_pointer_cast(builder.Finish()); + } + + std::unique_ptr Int64FileWriter() { + std::shared_ptr schema = Int64Schema(); + sink_ = std::make_shared(); + return ParquetFileWriter::Open(sink_, schema); + } + + std::unique_ptr DoubleFileWriter() { + std::shared_ptr schema = DoubleSchema(); + sink_ = std::make_shared(); + return ParquetFileWriter::Open(sink_, schema); + } + + std::unique_ptr ReaderFromSink() { + std::shared_ptr buffer = sink_->GetBuffer(); + std::unique_ptr source(new BufferReader(buffer)); + return ParquetFileReader::Open(std::move(source)); + } + + void ReadSingleColumnFile( + std::unique_ptr file_reader, std::shared_ptr* out) { + arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); + std::unique_ptr column_reader; + ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader))); + ASSERT_NE(nullptr, column_reader.get()); + ASSERT_OK(column_reader->NextBatch(100, out)); + ASSERT_NE(nullptr, out->get()); + } + + std::unique_ptr Int64File( + std::vector& values, int num_chunks) { + std::unique_ptr file_writer = Int64FileWriter(); + size_t chunk_size = values.size() / num_chunks; + for (int i = 0; i < num_chunks; i++) { + auto row_group_writer = file_writer->AppendRowGroup(chunk_size); + auto column_writer = + static_cast<::parquet::Int64Writer*>(row_group_writer->NextColumn()); + int64_t* data = values.data() + i * chunk_size; + column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); + column_writer->Close(); + row_group_writer->Close(); + } + file_writer->Close(); + return ReaderFromSink(); + } + + private: + std::shared_ptr sink_; +}; + +TEST_F(TestParquetIO, SingleColumnInt64Read) { + std::vector values(100, 128); + std::unique_ptr file_reader = Int64File(values, 1); + + std::shared_ptr out; + ReadSingleColumnFile(std::move(file_reader), &out); + + Int64Array* out_array = static_cast(out.get()); + for (size_t i = 0; i < values.size(); i++) { + EXPECT_EQ(values[i], out_array->raw_data()[i]); + } +} + +TEST_F(TestParquetIO, SingleColumnInt64ChunkedRead) { + std::vector values(100, 128); + std::unique_ptr file_reader = Int64File(values, 4); + + std::shared_ptr out; + ReadSingleColumnFile(std::move(file_reader), &out); + + Int64Array* out_array = static_cast(out.get()); + for (size_t i = 0; i < values.size(); i++) { + EXPECT_EQ(values[i], out_array->raw_data()[i]); + } +} + +TEST_F(TestParquetIO, SingleColumnInt64Write) { + std::shared_ptr values = Int64ValueArray(100, 128); + + FileWriter writer(default_memory_pool(), Int64FileWriter()); + ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length()))); + ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get()))); + ASSERT_NO_THROW(ASSERT_OK(writer.Close())); + + std::shared_ptr out; + ReadSingleColumnFile(ReaderFromSink(), &out); + ASSERT_TRUE(values->Equals(out)); +} + +TEST_F(TestParquetIO, SingleColumnDoubleReadWrite) { + // This also tests max_definition_level = 1 + std::shared_ptr values = DoubleValueArray(100, 128, 10); + + FileWriter writer(default_memory_pool(), DoubleFileWriter()); + ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length()))); + ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get()))); + ASSERT_NO_THROW(ASSERT_OK(writer.Close())); + + std::shared_ptr out; + ReadSingleColumnFile(ReaderFromSink(), &out); + ASSERT_TRUE(values->Equals(out)); +} + +TEST_F(TestParquetIO, SingleColumnInt64ChunkedWrite) { + std::shared_ptr values = Int64ValueArray(100, 128); + std::shared_ptr values_chunk = Int64ValueArray(25, 128); + + FileWriter writer(default_memory_pool(), Int64FileWriter()); + for (int i = 0; i < 4; i++) { + ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length()))); + ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get()))); + } + ASSERT_NO_THROW(ASSERT_OK(writer.Close())); + + std::shared_ptr out; + ReadSingleColumnFile(ReaderFromSink(), &out); + ASSERT_TRUE(values->Equals(out)); +} + +TEST_F(TestParquetIO, SingleColumnDoubleChunkedWrite) { + std::shared_ptr values = DoubleValueArray(100, 128, 10); + std::shared_ptr values_chunk_nulls = DoubleValueArray(25, 128, 10); + std::shared_ptr values_chunk = DoubleValueArray(25, 128, 0); + + FileWriter writer(default_memory_pool(), DoubleFileWriter()); + ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk_nulls->length()))); + ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk_nulls.get()))); + for (int i = 0; i < 3; i++) { + ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length()))); + ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get()))); + } + ASSERT_NO_THROW(ASSERT_OK(writer.Close())); + + std::shared_ptr out; + ReadSingleColumnFile(ReaderFromSink(), &out); + ASSERT_TRUE(values->Equals(out)); +} + +} // namespace parquet + +} // namespace arrow diff --git a/cpp/src/arrow/parquet/parquet-reader-test.cc b/cpp/src/arrow/parquet/parquet-reader-test.cc deleted file mode 100644 index a7fc2a89f5f..00000000000 --- a/cpp/src/arrow/parquet/parquet-reader-test.cc +++ /dev/null @@ -1,116 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "gtest/gtest.h" - -#include "arrow/test-util.h" -#include "arrow/parquet/reader.h" -#include "arrow/types/primitive.h" -#include "arrow/util/memory-pool.h" -#include "arrow/util/status.h" - -#include "parquet/api/reader.h" -#include "parquet/api/writer.h" - -using ParquetBuffer = parquet::Buffer; -using parquet::BufferReader; -using parquet::InMemoryOutputStream; -using parquet::Int64Writer; -using parquet::ParquetFileReader; -using parquet::ParquetFileWriter; -using parquet::RandomAccessSource; -using parquet::Repetition; -using parquet::SchemaDescriptor; -using ParquetType = parquet::Type; -using parquet::schema::GroupNode; -using parquet::schema::NodePtr; -using parquet::schema::PrimitiveNode; - -namespace arrow { - -namespace parquet { - -class TestReadParquet : public ::testing::Test { - public: - virtual void SetUp() {} - - std::shared_ptr Int64Schema() { - auto pnode = PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64); - NodePtr node_ = - GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); - return std::static_pointer_cast(node_); - } - - std::unique_ptr Int64File( - std::vector& values, int num_chunks) { - std::shared_ptr schema = Int64Schema(); - std::shared_ptr sink(new InMemoryOutputStream()); - auto file_writer = ParquetFileWriter::Open(sink, schema); - size_t chunk_size = values.size() / num_chunks; - for (int i = 0; i < num_chunks; i++) { - auto row_group_writer = file_writer->AppendRowGroup(chunk_size); - auto column_writer = static_cast(row_group_writer->NextColumn()); - int64_t* data = values.data() + i * chunk_size; - column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); - column_writer->Close(); - row_group_writer->Close(); - } - file_writer->Close(); - - std::shared_ptr buffer = sink->GetBuffer(); - std::unique_ptr source(new BufferReader(buffer)); - return ParquetFileReader::Open(std::move(source)); - } - - private: -}; - -TEST_F(TestReadParquet, SingleColumnInt64) { - std::vector values(100, 128); - std::unique_ptr file_reader = Int64File(values, 1); - arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); - std::unique_ptr column_reader; - ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader))); - ASSERT_NE(nullptr, column_reader.get()); - std::shared_ptr out; - ASSERT_OK(column_reader->NextBatch(100, &out)); - ASSERT_NE(nullptr, out.get()); - Int64Array* out_array = static_cast(out.get()); - for (size_t i = 0; i < values.size(); i++) { - EXPECT_EQ(values[i], out_array->raw_data()[i]); - } -} - -TEST_F(TestReadParquet, SingleColumnInt64Chunked) { - std::vector values(100, 128); - std::unique_ptr file_reader = Int64File(values, 4); - arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); - std::unique_ptr column_reader; - ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader))); - ASSERT_NE(nullptr, column_reader.get()); - std::shared_ptr out; - ASSERT_OK(column_reader->NextBatch(100, &out)); - ASSERT_NE(nullptr, out.get()); - Int64Array* out_array = static_cast(out.get()); - for (size_t i = 0; i < values.size(); i++) { - EXPECT_EQ(values[i], out_array->raw_data()[i]); - } -} - -} // namespace parquet - -} // namespace arrow diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc index 481ded5789a..22f255c5e3b 100644 --- a/cpp/src/arrow/parquet/reader.cc +++ b/cpp/src/arrow/parquet/reader.cc @@ -130,16 +130,37 @@ Status FlatColumnReader::Impl::TypedReadBatch( } auto reader = dynamic_cast*>(column_reader_.get()); int64_t values_read; + int64_t levels_read; + int16_t* def_levels = reinterpret_cast(def_levels_buffer_.mutable_data()); + int16_t* rep_levels = reinterpret_cast(rep_levels_buffer_.mutable_data()); CType* values = reinterpret_cast(values_buffer_.mutable_data()); - PARQUET_CATCH_NOT_OK( - values_to_read -= reader->ReadBatch(values_to_read, - reinterpret_cast(def_levels_buffer_.mutable_data()), - reinterpret_cast(rep_levels_buffer_.mutable_data()), values, - &values_read)); + PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(values_to_read, def_levels, + rep_levels, values, &values_read)); + values_to_read -= levels_read; if (descr_->max_definition_level() == 0) { RETURN_NOT_OK(builder.Append(values, values_read)); + } else if (descr_->max_definition_level() == 1) { + CType* values_start = values; + int num_values = 0; + for (int64_t i = 0; i < levels_read; i++) { + if (def_levels[i] < descr_->max_definition_level()) { + if (num_values > 0) { + // Bulk copy non-null values + RETURN_NOT_OK(builder.Append(values_start, num_values)); + values_start += num_values; + num_values = 0; + } + RETURN_NOT_OK(builder.AppendNull()); + } else { + num_values++; + } + } + if (num_values > 0) { + // Bulk copy non-null values + RETURN_NOT_OK(builder.Append(values_start, num_values)); + } } else { - return Status::NotImplemented("no support for definition levels yet"); + return Status::NotImplemented("no support for max definition level > 1 yet"); } if (!column_reader_->HasNext()) { NextRowGroup(); } } diff --git a/cpp/src/arrow/parquet/writer.cc b/cpp/src/arrow/parquet/writer.cc new file mode 100644 index 00000000000..d9042fa3652 --- /dev/null +++ b/cpp/src/arrow/parquet/writer.cc @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/parquet/writer.h" + +#include "arrow/array.h" +#include "arrow/types/primitive.h" +#include "arrow/parquet/utils.h" +#include "arrow/util/status.h" + +namespace arrow { + +namespace parquet { + +class FileWriter::Impl { + public: + Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer); + + Status NewRowGroup(int64_t chunk_size); + template + Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data); + Status WriteFlatColumnChunk(const PrimitiveArray* data); + Status Close(); + + virtual ~Impl() {} + + private: + MemoryPool* pool_; + PoolBuffer data_buffer_; + PoolBuffer def_levels_buffer_; + std::unique_ptr<::parquet::ParquetFileWriter> writer_; + ::parquet::RowGroupWriter* row_group_writer_; +}; + +FileWriter::Impl::Impl( + MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer) + : pool_(pool), + data_buffer_(pool), + writer_(std::move(writer)), + row_group_writer_(nullptr) {} + +Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) { + if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); } + PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup(chunk_size)); + return Status::OK(); +} + +template +Status FileWriter::Impl::TypedWriteBatch( + ::parquet::ColumnWriter* column_writer, const PrimitiveArray* data) { + auto data_ptr = reinterpret_cast(data->data()->data()); + auto writer = + reinterpret_cast<::parquet::TypedColumnWriter*>(column_writer); + if (writer->descr()->max_definition_level() == 0) { + // no nulls, just dump the data + writer->WriteBatch(data->length(), nullptr, nullptr, data_ptr); + } else if (writer->descr()->max_definition_level() == 1) { + def_levels_buffer_.Resize(data->length() * sizeof(int16_t)); + int16_t* def_levels_ptr = + reinterpret_cast(def_levels_buffer_.mutable_data()); + std::fill(def_levels_ptr, def_levels_ptr + data->length(), 1); + if (data->null_count() == 0) { + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(data->length(), def_levels_ptr, nullptr, data_ptr)); + } else { + data_buffer_.Resize((data->length() - data->null_count()) * sizeof(CType)); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); + size_t buffer_idx = 0; + for (size_t i = 0; i < data->length(); i++) { + if (data->IsNull(i)) { + def_levels_ptr[i] = 0; + } else { + buffer_ptr[buffer_idx++] = data_ptr[i]; + } + } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(data->length(), def_levels_ptr, nullptr, buffer_ptr)); + } + } else { + return Status::NotImplemented("no support for max definition level > 1 yet"); + } + PARQUET_CATCH_NOT_OK(writer->Close()); + return Status::OK(); +} + +Status FileWriter::Impl::Close() { + if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); } + PARQUET_CATCH_NOT_OK(writer_->Close()); + return Status::OK(); +} + +#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType, CType) \ + case Type::ENUM: \ + return TypedWriteBatch(writer, data); \ + break; + +Status FileWriter::Impl::WriteFlatColumnChunk(const PrimitiveArray* data) { + ::parquet::ColumnWriter* writer = row_group_writer_->NextColumn(); + switch (data->type_enum()) { + TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type, int32_t) + TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type, int64_t) + TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType, float) + TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType, double) + default: + return Status::NotImplemented(data->type()->ToString()); + } +} + +FileWriter::FileWriter( + MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer) + : impl_(new FileWriter::Impl(pool, std::move(writer))) {} + +Status FileWriter::NewRowGroup(int64_t chunk_size) { + return impl_->NewRowGroup(chunk_size); +} + +Status FileWriter::WriteFlatColumnChunk(const PrimitiveArray* data) { + return impl_->WriteFlatColumnChunk(data); +} + +Status FileWriter::Close() { + return impl_->Close(); +} + +FileWriter::~FileWriter() {} + +} // namespace parquet + +} // namespace arrow diff --git a/cpp/src/arrow/parquet/writer.h b/cpp/src/arrow/parquet/writer.h new file mode 100644 index 00000000000..38f7d0b3a89 --- /dev/null +++ b/cpp/src/arrow/parquet/writer.h @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_PARQUET_WRITER_H +#define ARROW_PARQUET_WRITER_H + +#include + +#include "parquet/api/schema.h" +#include "parquet/api/writer.h" + +namespace arrow { + +class MemoryPool; +class PrimitiveArray; +class RowBatch; +class Status; + +namespace parquet { + +/** + * Iterative API: + * Start a new RowGroup/Chunk with NewRowGroup + * Write column-by-column the whole column chunk + */ +class FileWriter { + public: + FileWriter(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer); + + Status NewRowGroup(int64_t chunk_size); + Status WriteFlatColumnChunk(const PrimitiveArray* data); + Status Close(); + + virtual ~FileWriter(); + + private: + class Impl; + std::unique_ptr impl_; +}; + +} // namespace parquet + +} // namespace arrow + +#endif // ARROW_PARQUET_WRITER_H From 8fdd4c848d8d2d648b09998c0b8128e399996c59 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Tue, 17 May 2016 07:41:14 +0200 Subject: [PATCH 02/10] Parameterize schema creation --- cpp/src/arrow/parquet/parquet-io-test.cc | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc index 17f90bcaf03..e2e69f02e5e 100644 --- a/cpp/src/arrow/parquet/parquet-io-test.cc +++ b/cpp/src/arrow/parquet/parquet-io-test.cc @@ -48,15 +48,8 @@ class TestParquetIO : public ::testing::Test { public: virtual void SetUp() {} - std::shared_ptr Int64Schema() { - auto pnode = PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64); - NodePtr node_ = - GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); - return std::static_pointer_cast(node_); - } - - std::shared_ptr DoubleSchema() { - auto pnode = PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE); + std::shared_ptr Schema(ParquetType::type parquet_type, Repetition::type repetition) { + auto pnode = PrimitiveNode::Make("column1", repetition, parquet_type); NodePtr node_ = GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); return std::static_pointer_cast(node_); @@ -84,13 +77,13 @@ class TestParquetIO : public ::testing::Test { } std::unique_ptr Int64FileWriter() { - std::shared_ptr schema = Int64Schema(); + std::shared_ptr schema = Schema(ParquetType::INT64, Repetition::REQUIRED); sink_ = std::make_shared(); return ParquetFileWriter::Open(sink_, schema); } std::unique_ptr DoubleFileWriter() { - std::shared_ptr schema = DoubleSchema(); + std::shared_ptr schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL); sink_ = std::make_shared(); return ParquetFileWriter::Open(sink_, schema); } From 1aa7698126d34a113cba08bbd3eea7b100e84425 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Tue, 17 May 2016 07:41:31 +0200 Subject: [PATCH 03/10] Add comment to helper function --- cpp/src/arrow/parquet/parquet-io-test.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc index e2e69f02e5e..3b9332c0c5e 100644 --- a/cpp/src/arrow/parquet/parquet-io-test.cc +++ b/cpp/src/arrow/parquet/parquet-io-test.cc @@ -55,6 +55,7 @@ class TestParquetIO : public ::testing::Test { return std::static_pointer_cast(node_); } + // This helper function only supports (size/2) nulls yet. std::shared_ptr DoubleValueArray( size_t size, double value, size_t num_nulls) { std::vector values(size, value); From 77386eade88b0ea4f7f13960bccce504deb71bb1 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Tue, 17 May 2016 08:31:39 +0200 Subject: [PATCH 04/10] Templatize test functions --- cpp/src/arrow/parquet/parquet-io-test.cc | 94 +++++++++++++----------- 1 file changed, 50 insertions(+), 44 deletions(-) diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc index 3b9332c0c5e..826a6a12113 100644 --- a/cpp/src/arrow/parquet/parquet-io-test.cc +++ b/cpp/src/arrow/parquet/parquet-io-test.cc @@ -44,47 +44,43 @@ namespace arrow { namespace parquet { +template +std::shared_ptr NonNullArray(size_t size, CType value) { + std::vector values(size, value); + NumericBuilder builder(default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size()); + return std::static_pointer_cast(builder.Finish()); +} + +// This helper function only supports (size/2) nulls yet. +template +std::shared_ptr NullableArray( + size_t size, CType value, size_t num_nulls) { + std::vector values(size, value); + std::vector valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + NumericBuilder builder(default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size(), valid_bytes.data()); + return std::static_pointer_cast(builder.Finish()); +} + class TestParquetIO : public ::testing::Test { public: virtual void SetUp() {} - std::shared_ptr Schema(ParquetType::type parquet_type, Repetition::type repetition) { + std::shared_ptr Schema( + ParquetType::type parquet_type, Repetition::type repetition) { auto pnode = PrimitiveNode::Make("column1", repetition, parquet_type); NodePtr node_ = GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); return std::static_pointer_cast(node_); } - // This helper function only supports (size/2) nulls yet. - std::shared_ptr DoubleValueArray( - size_t size, double value, size_t num_nulls) { - std::vector values(size, value); - std::vector valid_bytes(size, 1); - - for (size_t i = 0; i < num_nulls; i++) { - valid_bytes[i * 2] = 0; - } - - DoubleBuilder builder(default_memory_pool(), std::make_shared()); - builder.Append(values.data(), values.size(), valid_bytes.data()); - return std::static_pointer_cast(builder.Finish()); - } - - std::shared_ptr Int64ValueArray(size_t size, int64_t value) { - std::vector values(size, value); - Int64Builder builder(default_memory_pool(), std::make_shared()); - builder.Append(values.data(), values.size()); - return std::static_pointer_cast(builder.Finish()); - } - - std::unique_ptr Int64FileWriter() { - std::shared_ptr schema = Schema(ParquetType::INT64, Repetition::REQUIRED); - sink_ = std::make_shared(); - return ParquetFileWriter::Open(sink_, schema); - } - - std::unique_ptr DoubleFileWriter() { - std::shared_ptr schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL); + std::unique_ptr MakeWriter(std::shared_ptr& schema) { sink_ = std::make_shared(); return ParquetFileWriter::Open(sink_, schema); } @@ -107,7 +103,8 @@ class TestParquetIO : public ::testing::Test { std::unique_ptr Int64File( std::vector& values, int num_chunks) { - std::unique_ptr file_writer = Int64FileWriter(); + std::shared_ptr schema = Schema(ParquetType::INT64, Repetition::REQUIRED); + std::unique_ptr file_writer = MakeWriter(schema); size_t chunk_size = values.size() / num_chunks; for (int i = 0; i < num_chunks; i++) { auto row_group_writer = file_writer->AppendRowGroup(chunk_size); @@ -153,9 +150,10 @@ TEST_F(TestParquetIO, SingleColumnInt64ChunkedRead) { } TEST_F(TestParquetIO, SingleColumnInt64Write) { - std::shared_ptr values = Int64ValueArray(100, 128); + std::shared_ptr values = NonNullArray(100, 128); - FileWriter writer(default_memory_pool(), Int64FileWriter()); + std::shared_ptr schema = Schema(ParquetType::INT64, Repetition::REQUIRED); + FileWriter writer(default_memory_pool(), MakeWriter(schema)); ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length()))); ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get()))); ASSERT_NO_THROW(ASSERT_OK(writer.Close())); @@ -167,9 +165,11 @@ TEST_F(TestParquetIO, SingleColumnInt64Write) { TEST_F(TestParquetIO, SingleColumnDoubleReadWrite) { // This also tests max_definition_level = 1 - std::shared_ptr values = DoubleValueArray(100, 128, 10); + std::shared_ptr values = + NullableArray(100, 128, 10); - FileWriter writer(default_memory_pool(), DoubleFileWriter()); + std::shared_ptr schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL); + FileWriter writer(default_memory_pool(), MakeWriter(schema)); ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length()))); ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get()))); ASSERT_NO_THROW(ASSERT_OK(writer.Close())); @@ -180,10 +180,12 @@ TEST_F(TestParquetIO, SingleColumnDoubleReadWrite) { } TEST_F(TestParquetIO, SingleColumnInt64ChunkedWrite) { - std::shared_ptr values = Int64ValueArray(100, 128); - std::shared_ptr values_chunk = Int64ValueArray(25, 128); + std::shared_ptr values = NonNullArray(100, 128); + std::shared_ptr values_chunk = + NonNullArray(25, 128); - FileWriter writer(default_memory_pool(), Int64FileWriter()); + std::shared_ptr schema = Schema(ParquetType::INT64, Repetition::REQUIRED); + FileWriter writer(default_memory_pool(), MakeWriter(schema)); for (int i = 0; i < 4; i++) { ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length()))); ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get()))); @@ -196,11 +198,15 @@ TEST_F(TestParquetIO, SingleColumnInt64ChunkedWrite) { } TEST_F(TestParquetIO, SingleColumnDoubleChunkedWrite) { - std::shared_ptr values = DoubleValueArray(100, 128, 10); - std::shared_ptr values_chunk_nulls = DoubleValueArray(25, 128, 10); - std::shared_ptr values_chunk = DoubleValueArray(25, 128, 0); - - FileWriter writer(default_memory_pool(), DoubleFileWriter()); + std::shared_ptr values = + NullableArray(100, 128, 10); + std::shared_ptr values_chunk_nulls = + NullableArray(25, 128, 10); + std::shared_ptr values_chunk = + NullableArray(25, 128, 0); + + std::shared_ptr schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL); + FileWriter writer(default_memory_pool(), MakeWriter(schema)); ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk_nulls->length()))); ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk_nulls.get()))); for (int i = 0; i < 3; i++) { From efd46fbfefe326feded0ac3e4d5f4540110b0045 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Tue, 17 May 2016 08:37:23 +0200 Subject: [PATCH 05/10] Infer c-type from ArrowType --- cpp/src/arrow/parquet/parquet-io-test.cc | 31 +++++++++++------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc index 826a6a12113..845574d2c53 100644 --- a/cpp/src/arrow/parquet/parquet-io-test.cc +++ b/cpp/src/arrow/parquet/parquet-io-test.cc @@ -44,19 +44,20 @@ namespace arrow { namespace parquet { -template -std::shared_ptr NonNullArray(size_t size, CType value) { - std::vector values(size, value); +template +std::shared_ptr NonNullArray( + size_t size, typename ArrowType::c_type value) { + std::vector values(size, value); NumericBuilder builder(default_memory_pool(), std::make_shared()); builder.Append(values.data(), values.size()); return std::static_pointer_cast(builder.Finish()); } // This helper function only supports (size/2) nulls yet. -template +template std::shared_ptr NullableArray( - size_t size, CType value, size_t num_nulls) { - std::vector values(size, value); + size_t size, typename ArrowType::c_type value, size_t num_nulls) { + std::vector values(size, value); std::vector valid_bytes(size, 1); for (size_t i = 0; i < num_nulls; i++) { @@ -150,7 +151,7 @@ TEST_F(TestParquetIO, SingleColumnInt64ChunkedRead) { } TEST_F(TestParquetIO, SingleColumnInt64Write) { - std::shared_ptr values = NonNullArray(100, 128); + std::shared_ptr values = NonNullArray(100, 128); std::shared_ptr schema = Schema(ParquetType::INT64, Repetition::REQUIRED); FileWriter writer(default_memory_pool(), MakeWriter(schema)); @@ -165,8 +166,7 @@ TEST_F(TestParquetIO, SingleColumnInt64Write) { TEST_F(TestParquetIO, SingleColumnDoubleReadWrite) { // This also tests max_definition_level = 1 - std::shared_ptr values = - NullableArray(100, 128, 10); + std::shared_ptr values = NullableArray(100, 128, 10); std::shared_ptr schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL); FileWriter writer(default_memory_pool(), MakeWriter(schema)); @@ -180,9 +180,8 @@ TEST_F(TestParquetIO, SingleColumnDoubleReadWrite) { } TEST_F(TestParquetIO, SingleColumnInt64ChunkedWrite) { - std::shared_ptr values = NonNullArray(100, 128); - std::shared_ptr values_chunk = - NonNullArray(25, 128); + std::shared_ptr values = NonNullArray(100, 128); + std::shared_ptr values_chunk = NonNullArray(25, 128); std::shared_ptr schema = Schema(ParquetType::INT64, Repetition::REQUIRED); FileWriter writer(default_memory_pool(), MakeWriter(schema)); @@ -198,12 +197,10 @@ TEST_F(TestParquetIO, SingleColumnInt64ChunkedWrite) { } TEST_F(TestParquetIO, SingleColumnDoubleChunkedWrite) { - std::shared_ptr values = - NullableArray(100, 128, 10); + std::shared_ptr values = NullableArray(100, 128, 10); std::shared_ptr values_chunk_nulls = - NullableArray(25, 128, 10); - std::shared_ptr values_chunk = - NullableArray(25, 128, 0); + NullableArray(25, 128, 10); + std::shared_ptr values_chunk = NullableArray(25, 128, 0); std::shared_ptr schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL); FileWriter writer(default_memory_pool(), MakeWriter(schema)); From 2179c0e5f7de93df3584596743f7eb5f7e47d4fe Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Tue, 17 May 2016 08:43:19 +0200 Subject: [PATCH 06/10] Infer c-type from ArrowType --- cpp/src/arrow/parquet/reader.cc | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc index 22f255c5e3b..ecaeeca396a 100644 --- a/cpp/src/arrow/parquet/reader.cc +++ b/cpp/src/arrow/parquet/reader.cc @@ -51,7 +51,7 @@ class FlatColumnReader::Impl { virtual ~Impl() {} Status NextBatch(int batch_size, std::shared_ptr* out); - template + template Status TypedReadBatch(int batch_size, std::shared_ptr* out); private: @@ -115,13 +115,13 @@ FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor NextRowGroup(); } -template +template Status FlatColumnReader::Impl::TypedReadBatch( int batch_size, std::shared_ptr* out) { int values_to_read = batch_size; NumericBuilder builder(pool_, field_->type); while ((values_to_read > 0) && column_reader_) { - values_buffer_.Resize(values_to_read * sizeof(CType)); + values_buffer_.Resize(values_to_read * sizeof(typename ParquetType::c_type)); if (descr_->max_definition_level() > 0) { def_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); } @@ -133,14 +133,15 @@ Status FlatColumnReader::Impl::TypedReadBatch( int64_t levels_read; int16_t* def_levels = reinterpret_cast(def_levels_buffer_.mutable_data()); int16_t* rep_levels = reinterpret_cast(rep_levels_buffer_.mutable_data()); - CType* values = reinterpret_cast(values_buffer_.mutable_data()); + auto values = + reinterpret_cast(values_buffer_.mutable_data()); PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(values_to_read, def_levels, rep_levels, values, &values_read)); values_to_read -= levels_read; if (descr_->max_definition_level() == 0) { RETURN_NOT_OK(builder.Append(values, values_read)); } else if (descr_->max_definition_level() == 1) { - CType* values_start = values; + typename ParquetType::c_type* values_start = values; int num_values = 0; for (int64_t i = 0; i < levels_read; i++) { if (def_levels[i] < descr_->max_definition_level()) { @@ -168,9 +169,9 @@ Status FlatColumnReader::Impl::TypedReadBatch( return Status::OK(); } -#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType, CType) \ - case Type::ENUM: \ - return TypedReadBatch(batch_size, out); \ +#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \ + case Type::ENUM: \ + return TypedReadBatch(batch_size, out); \ break; Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out) { @@ -185,10 +186,10 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* } switch (field_->type->type) { - TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type, int32_t) - TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type, int64_t) - TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType, float) - TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType, double) + TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type) + TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType) + TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType) default: return Status::NotImplemented(field_->type->ToString()); } From ba240e899e20494867e06a12b7bb1b6d0f62184e Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Tue, 17 May 2016 08:49:07 +0200 Subject: [PATCH 07/10] Incorporate writer comments --- cpp/src/arrow/parquet/writer.cc | 39 +++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/cpp/src/arrow/parquet/writer.cc b/cpp/src/arrow/parquet/writer.cc index d9042fa3652..3ad2c5b0735 100644 --- a/cpp/src/arrow/parquet/writer.cc +++ b/cpp/src/arrow/parquet/writer.cc @@ -31,7 +31,7 @@ class FileWriter::Impl { Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer); Status NewRowGroup(int64_t chunk_size); - template + template Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data); Status WriteFlatColumnChunk(const PrimitiveArray* data); Status Close(); @@ -59,31 +59,35 @@ Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) { return Status::OK(); } -template +template Status FileWriter::Impl::TypedWriteBatch( ::parquet::ColumnWriter* column_writer, const PrimitiveArray* data) { - auto data_ptr = reinterpret_cast(data->data()->data()); + auto data_ptr = + reinterpret_cast(data->data()->data()); auto writer = reinterpret_cast<::parquet::TypedColumnWriter*>(column_writer); if (writer->descr()->max_definition_level() == 0) { // no nulls, just dump the data - writer->WriteBatch(data->length(), nullptr, nullptr, data_ptr); + PARQUET_CATCH_NOT_OK(writer->WriteBatch(data->length(), nullptr, nullptr, data_ptr)); } else if (writer->descr()->max_definition_level() == 1) { - def_levels_buffer_.Resize(data->length() * sizeof(int16_t)); + RETURN_NOT_OK(def_levels_buffer_.Resize(data->length() * sizeof(int16_t))); int16_t* def_levels_ptr = reinterpret_cast(def_levels_buffer_.mutable_data()); - std::fill(def_levels_ptr, def_levels_ptr + data->length(), 1); if (data->null_count() == 0) { + std::fill(def_levels_ptr, def_levels_ptr + data->length(), 1); PARQUET_CATCH_NOT_OK( writer->WriteBatch(data->length(), def_levels_ptr, nullptr, data_ptr)); } else { - data_buffer_.Resize((data->length() - data->null_count()) * sizeof(CType)); - auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); - size_t buffer_idx = 0; + RETURN_NOT_OK(data_buffer_.Resize( + (data->length() - data->null_count()) * sizeof(typename ParquetType::c_type))); + auto buffer_ptr = + reinterpret_cast(data_buffer_.mutable_data()); + int buffer_idx = 0; for (size_t i = 0; i < data->length(); i++) { if (data->IsNull(i)) { def_levels_ptr[i] = 0; } else { + def_levels_ptr[i] = 1; buffer_ptr[buffer_idx++] = data_ptr[i]; } } @@ -103,18 +107,19 @@ Status FileWriter::Impl::Close() { return Status::OK(); } -#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType, CType) \ - case Type::ENUM: \ - return TypedWriteBatch(writer, data); \ +#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \ + case Type::ENUM: \ + return TypedWriteBatch(writer, data); \ break; Status FileWriter::Impl::WriteFlatColumnChunk(const PrimitiveArray* data) { - ::parquet::ColumnWriter* writer = row_group_writer_->NextColumn(); + ::parquet::ColumnWriter* writer; + PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn()); switch (data->type_enum()) { - TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type, int32_t) - TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type, int64_t) - TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType, float) - TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType, double) + TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type) + TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType) + TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType) default: return Status::NotImplemented(data->type()->ToString()); } From f81021ba1b2c01703572ceeb8362f56c3f6fb2b5 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Tue, 17 May 2016 09:30:38 +0200 Subject: [PATCH 08/10] Incorporate reader comments --- cpp/src/arrow/parquet/reader.cc | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc index ecaeeca396a..6b2994ce608 100644 --- a/cpp/src/arrow/parquet/reader.cc +++ b/cpp/src/arrow/parquet/reader.cc @@ -68,6 +68,8 @@ class FlatColumnReader::Impl { PoolBuffer values_buffer_; PoolBuffer def_levels_buffer_; PoolBuffer rep_levels_buffer_; + PoolBuffer values_builder_buffer_; + PoolBuffer valid_bytes_buffer_; }; FileReader::Impl::Impl( @@ -141,25 +143,22 @@ Status FlatColumnReader::Impl::TypedReadBatch( if (descr_->max_definition_level() == 0) { RETURN_NOT_OK(builder.Append(values, values_read)); } else if (descr_->max_definition_level() == 1) { - typename ParquetType::c_type* values_start = values; - int num_values = 0; + RETURN_NOT_OK(values_builder_buffer_.Resize( + levels_read * sizeof(typename ParquetType::c_type))); + RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t))); + auto values_ptr = reinterpret_cast( + values_builder_buffer_.mutable_data()); + uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data(); + int values_idx = 0; for (int64_t i = 0; i < levels_read; i++) { if (def_levels[i] < descr_->max_definition_level()) { - if (num_values > 0) { - // Bulk copy non-null values - RETURN_NOT_OK(builder.Append(values_start, num_values)); - values_start += num_values; - num_values = 0; - } - RETURN_NOT_OK(builder.AppendNull()); + valid_bytes[i] = 0; } else { - num_values++; + valid_bytes[i] = 1; + values_ptr[i] = values[values_idx++]; } } - if (num_values > 0) { - // Bulk copy non-null values - RETURN_NOT_OK(builder.Append(values_start, num_values)); - } + builder.Append(values_ptr, levels_read, valid_bytes); } else { return Status::NotImplemented("no support for max definition level > 1 yet"); } From 88ae3cab30b6a8f8f74e617e6b872dd70b1e6578 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Tue, 17 May 2016 21:09:53 +0200 Subject: [PATCH 09/10] Install arrow_parquet headers --- cpp/src/arrow/parquet/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/parquet/CMakeLists.txt b/cpp/src/arrow/parquet/CMakeLists.txt index 7254c485d33..c00cc9f0f25 100644 --- a/cpp/src/arrow/parquet/CMakeLists.txt +++ b/cpp/src/arrow/parquet/CMakeLists.txt @@ -46,6 +46,7 @@ install(FILES reader.h schema.h utils.h + writer.h DESTINATION include/arrow/parquet) install(TARGETS arrow_parquet From 5d95099bac290d282578c7d4c9ccdf340b145283 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Wed, 18 May 2016 08:56:29 +0200 Subject: [PATCH 10/10] Add check for flat column --- cpp/src/arrow/parquet/reader.cc | 36 ++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc index 6b2994ce608..346de253606 100644 --- a/cpp/src/arrow/parquet/reader.cc +++ b/cpp/src/arrow/parquet/reader.cc @@ -26,6 +26,7 @@ #include "arrow/util/status.h" using parquet::ColumnReader; +using parquet::Repetition; using parquet::TypedColumnReader; namespace arrow { @@ -36,6 +37,7 @@ class FileReader::Impl { Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader); virtual ~Impl() {} + bool CheckForFlatColumn(const ::parquet::ColumnDescriptor* descr); Status GetFlatColumn(int i, std::unique_ptr* out); Status ReadFlatColumn(int i, std::shared_ptr* out); @@ -67,7 +69,6 @@ class FlatColumnReader::Impl { PoolBuffer values_buffer_; PoolBuffer def_levels_buffer_; - PoolBuffer rep_levels_buffer_; PoolBuffer values_builder_buffer_; PoolBuffer valid_bytes_buffer_; }; @@ -76,7 +77,20 @@ FileReader::Impl::Impl( MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader) : pool_(pool), reader_(std::move(reader)) {} +bool FileReader::Impl::CheckForFlatColumn(const ::parquet::ColumnDescriptor* descr) { + if ((descr->max_repetition_level() > 0) || (descr->max_definition_level() > 1)) { + return false; + } else if ((descr->max_definition_level() == 1) && + (descr->schema_node()->repetition() != Repetition::OPTIONAL)) { + return false; + } + return true; +} + Status FileReader::Impl::GetFlatColumn(int i, std::unique_ptr* out) { + if (!CheckForFlatColumn(reader_->descr()->Column(i))) { + return Status::Invalid("The requested column is not flat"); + } std::unique_ptr impl( new FlatColumnReader::Impl(pool_, reader_->descr()->Column(i), reader_.get(), i)); *out = std::unique_ptr(new FlatColumnReader(std::move(impl))); @@ -111,8 +125,7 @@ FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor column_index_(column_index), next_row_group_(0), values_buffer_(pool), - def_levels_buffer_(pool), - rep_levels_buffer_(pool) { + def_levels_buffer_(pool) { NodeToField(descr_->schema_node(), &field_); NextRowGroup(); } @@ -127,22 +140,19 @@ Status FlatColumnReader::Impl::TypedReadBatch( if (descr_->max_definition_level() > 0) { def_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); } - if (descr_->max_repetition_level() > 0) { - rep_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); - } auto reader = dynamic_cast*>(column_reader_.get()); int64_t values_read; int64_t levels_read; int16_t* def_levels = reinterpret_cast(def_levels_buffer_.mutable_data()); - int16_t* rep_levels = reinterpret_cast(rep_levels_buffer_.mutable_data()); auto values = reinterpret_cast(values_buffer_.mutable_data()); - PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(values_to_read, def_levels, - rep_levels, values, &values_read)); + PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch( + values_to_read, def_levels, nullptr, values, &values_read)); values_to_read -= levels_read; if (descr_->max_definition_level() == 0) { RETURN_NOT_OK(builder.Append(values, values_read)); - } else if (descr_->max_definition_level() == 1) { + } else { + // descr_->max_definition_level() == 1 RETURN_NOT_OK(values_builder_buffer_.Resize( levels_read * sizeof(typename ParquetType::c_type))); RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t))); @@ -159,8 +169,6 @@ Status FlatColumnReader::Impl::TypedReadBatch( } } builder.Append(values_ptr, levels_read, valid_bytes); - } else { - return Status::NotImplemented("no support for max definition level > 1 yet"); } if (!column_reader_->HasNext()) { NextRowGroup(); } } @@ -180,10 +188,6 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* return Status::OK(); } - if (descr_->max_repetition_level() > 0) { - return Status::NotImplemented("no support for repetition yet"); - } - switch (field_->type->type) { TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type) TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type)