diff --git a/cpp/src/arrow/parquet/CMakeLists.txt b/cpp/src/arrow/parquet/CMakeLists.txt index cd6f05d6b5f..c00cc9f0f25 100644 --- a/cpp/src/arrow/parquet/CMakeLists.txt +++ b/cpp/src/arrow/parquet/CMakeLists.txt @@ -21,6 +21,7 @@ set(PARQUET_SRCS reader.cc schema.cc + writer.cc ) set(PARQUET_LIBS @@ -37,14 +38,15 @@ SET_TARGET_PROPERTIES(arrow_parquet PROPERTIES LINKER_LANGUAGE CXX) ADD_ARROW_TEST(parquet-schema-test) ARROW_TEST_LINK_LIBRARIES(parquet-schema-test arrow_parquet) -ADD_ARROW_TEST(parquet-reader-test) -ARROW_TEST_LINK_LIBRARIES(parquet-reader-test arrow_parquet) +ADD_ARROW_TEST(parquet-io-test) +ARROW_TEST_LINK_LIBRARIES(parquet-io-test arrow_parquet) # Headers: top level install(FILES reader.h schema.h utils.h + writer.h DESTINATION include/arrow/parquet) install(TARGETS arrow_parquet diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc new file mode 100644 index 00000000000..845574d2c53 --- /dev/null +++ b/cpp/src/arrow/parquet/parquet-io-test.cc @@ -0,0 +1,222 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gtest/gtest.h" + +#include "arrow/test-util.h" +#include "arrow/parquet/reader.h" +#include "arrow/parquet/writer.h" +#include "arrow/types/primitive.h" +#include "arrow/util/memory-pool.h" +#include "arrow/util/status.h" + +#include "parquet/api/reader.h" +#include "parquet/api/writer.h" + +using ParquetBuffer = parquet::Buffer; +using parquet::BufferReader; +using parquet::InMemoryOutputStream; +using parquet::ParquetFileReader; +using parquet::ParquetFileWriter; +using parquet::RandomAccessSource; +using parquet::Repetition; +using parquet::SchemaDescriptor; +using ParquetType = parquet::Type; +using parquet::schema::GroupNode; +using parquet::schema::NodePtr; +using parquet::schema::PrimitiveNode; + +namespace arrow { + +namespace parquet { + +template +std::shared_ptr NonNullArray( + size_t size, typename ArrowType::c_type value) { + std::vector values(size, value); + NumericBuilder builder(default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size()); + return std::static_pointer_cast(builder.Finish()); +} + +// This helper function only supports (size/2) nulls yet. +template +std::shared_ptr NullableArray( + size_t size, typename ArrowType::c_type value, size_t num_nulls) { + std::vector values(size, value); + std::vector valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + NumericBuilder builder(default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size(), valid_bytes.data()); + return std::static_pointer_cast(builder.Finish()); +} + +class TestParquetIO : public ::testing::Test { + public: + virtual void SetUp() {} + + std::shared_ptr Schema( + ParquetType::type parquet_type, Repetition::type repetition) { + auto pnode = PrimitiveNode::Make("column1", repetition, parquet_type); + NodePtr node_ = + GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); + return std::static_pointer_cast(node_); + } + + std::unique_ptr MakeWriter(std::shared_ptr& schema) { + sink_ = std::make_shared(); + return ParquetFileWriter::Open(sink_, schema); + } + + std::unique_ptr ReaderFromSink() { + std::shared_ptr buffer = sink_->GetBuffer(); + std::unique_ptr source(new BufferReader(buffer)); + return ParquetFileReader::Open(std::move(source)); + } + + void ReadSingleColumnFile( + std::unique_ptr file_reader, std::shared_ptr* out) { + arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); + std::unique_ptr column_reader; + ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader))); + ASSERT_NE(nullptr, column_reader.get()); + ASSERT_OK(column_reader->NextBatch(100, out)); + ASSERT_NE(nullptr, out->get()); + } + + std::unique_ptr Int64File( + std::vector& values, int num_chunks) { + std::shared_ptr schema = Schema(ParquetType::INT64, Repetition::REQUIRED); + std::unique_ptr file_writer = MakeWriter(schema); + size_t chunk_size = values.size() / num_chunks; + for (int i = 0; i < num_chunks; i++) { + auto row_group_writer = file_writer->AppendRowGroup(chunk_size); + auto column_writer = + static_cast<::parquet::Int64Writer*>(row_group_writer->NextColumn()); + int64_t* data = values.data() + i * chunk_size; + column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); + column_writer->Close(); + row_group_writer->Close(); + } + file_writer->Close(); + return ReaderFromSink(); + } + + private: + std::shared_ptr sink_; +}; + +TEST_F(TestParquetIO, SingleColumnInt64Read) { + std::vector values(100, 128); + std::unique_ptr file_reader = Int64File(values, 1); + + std::shared_ptr out; + ReadSingleColumnFile(std::move(file_reader), &out); + + Int64Array* out_array = static_cast(out.get()); + for (size_t i = 0; i < values.size(); i++) { + EXPECT_EQ(values[i], out_array->raw_data()[i]); + } +} + +TEST_F(TestParquetIO, SingleColumnInt64ChunkedRead) { + std::vector values(100, 128); + std::unique_ptr file_reader = Int64File(values, 4); + + std::shared_ptr out; + ReadSingleColumnFile(std::move(file_reader), &out); + + Int64Array* out_array = static_cast(out.get()); + for (size_t i = 0; i < values.size(); i++) { + EXPECT_EQ(values[i], out_array->raw_data()[i]); + } +} + +TEST_F(TestParquetIO, SingleColumnInt64Write) { + std::shared_ptr values = NonNullArray(100, 128); + + std::shared_ptr schema = Schema(ParquetType::INT64, Repetition::REQUIRED); + FileWriter writer(default_memory_pool(), MakeWriter(schema)); + ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length()))); + ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get()))); + ASSERT_NO_THROW(ASSERT_OK(writer.Close())); + + std::shared_ptr out; + ReadSingleColumnFile(ReaderFromSink(), &out); + ASSERT_TRUE(values->Equals(out)); +} + +TEST_F(TestParquetIO, SingleColumnDoubleReadWrite) { + // This also tests max_definition_level = 1 + std::shared_ptr values = NullableArray(100, 128, 10); + + std::shared_ptr schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL); + FileWriter writer(default_memory_pool(), MakeWriter(schema)); + ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length()))); + ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get()))); + ASSERT_NO_THROW(ASSERT_OK(writer.Close())); + + std::shared_ptr out; + ReadSingleColumnFile(ReaderFromSink(), &out); + ASSERT_TRUE(values->Equals(out)); +} + +TEST_F(TestParquetIO, SingleColumnInt64ChunkedWrite) { + std::shared_ptr values = NonNullArray(100, 128); + std::shared_ptr values_chunk = NonNullArray(25, 128); + + std::shared_ptr schema = Schema(ParquetType::INT64, Repetition::REQUIRED); + FileWriter writer(default_memory_pool(), MakeWriter(schema)); + for (int i = 0; i < 4; i++) { + ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length()))); + ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get()))); + } + ASSERT_NO_THROW(ASSERT_OK(writer.Close())); + + std::shared_ptr out; + ReadSingleColumnFile(ReaderFromSink(), &out); + ASSERT_TRUE(values->Equals(out)); +} + +TEST_F(TestParquetIO, SingleColumnDoubleChunkedWrite) { + std::shared_ptr values = NullableArray(100, 128, 10); + std::shared_ptr values_chunk_nulls = + NullableArray(25, 128, 10); + std::shared_ptr values_chunk = NullableArray(25, 128, 0); + + std::shared_ptr schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL); + FileWriter writer(default_memory_pool(), MakeWriter(schema)); + ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk_nulls->length()))); + ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk_nulls.get()))); + for (int i = 0; i < 3; i++) { + ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length()))); + ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get()))); + } + ASSERT_NO_THROW(ASSERT_OK(writer.Close())); + + std::shared_ptr out; + ReadSingleColumnFile(ReaderFromSink(), &out); + ASSERT_TRUE(values->Equals(out)); +} + +} // namespace parquet + +} // namespace arrow diff --git a/cpp/src/arrow/parquet/parquet-reader-test.cc b/cpp/src/arrow/parquet/parquet-reader-test.cc deleted file mode 100644 index a7fc2a89f5f..00000000000 --- a/cpp/src/arrow/parquet/parquet-reader-test.cc +++ /dev/null @@ -1,116 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "gtest/gtest.h" - -#include "arrow/test-util.h" -#include "arrow/parquet/reader.h" -#include "arrow/types/primitive.h" -#include "arrow/util/memory-pool.h" -#include "arrow/util/status.h" - -#include "parquet/api/reader.h" -#include "parquet/api/writer.h" - -using ParquetBuffer = parquet::Buffer; -using parquet::BufferReader; -using parquet::InMemoryOutputStream; -using parquet::Int64Writer; -using parquet::ParquetFileReader; -using parquet::ParquetFileWriter; -using parquet::RandomAccessSource; -using parquet::Repetition; -using parquet::SchemaDescriptor; -using ParquetType = parquet::Type; -using parquet::schema::GroupNode; -using parquet::schema::NodePtr; -using parquet::schema::PrimitiveNode; - -namespace arrow { - -namespace parquet { - -class TestReadParquet : public ::testing::Test { - public: - virtual void SetUp() {} - - std::shared_ptr Int64Schema() { - auto pnode = PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64); - NodePtr node_ = - GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); - return std::static_pointer_cast(node_); - } - - std::unique_ptr Int64File( - std::vector& values, int num_chunks) { - std::shared_ptr schema = Int64Schema(); - std::shared_ptr sink(new InMemoryOutputStream()); - auto file_writer = ParquetFileWriter::Open(sink, schema); - size_t chunk_size = values.size() / num_chunks; - for (int i = 0; i < num_chunks; i++) { - auto row_group_writer = file_writer->AppendRowGroup(chunk_size); - auto column_writer = static_cast(row_group_writer->NextColumn()); - int64_t* data = values.data() + i * chunk_size; - column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); - column_writer->Close(); - row_group_writer->Close(); - } - file_writer->Close(); - - std::shared_ptr buffer = sink->GetBuffer(); - std::unique_ptr source(new BufferReader(buffer)); - return ParquetFileReader::Open(std::move(source)); - } - - private: -}; - -TEST_F(TestReadParquet, SingleColumnInt64) { - std::vector values(100, 128); - std::unique_ptr file_reader = Int64File(values, 1); - arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); - std::unique_ptr column_reader; - ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader))); - ASSERT_NE(nullptr, column_reader.get()); - std::shared_ptr out; - ASSERT_OK(column_reader->NextBatch(100, &out)); - ASSERT_NE(nullptr, out.get()); - Int64Array* out_array = static_cast(out.get()); - for (size_t i = 0; i < values.size(); i++) { - EXPECT_EQ(values[i], out_array->raw_data()[i]); - } -} - -TEST_F(TestReadParquet, SingleColumnInt64Chunked) { - std::vector values(100, 128); - std::unique_ptr file_reader = Int64File(values, 4); - arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); - std::unique_ptr column_reader; - ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader))); - ASSERT_NE(nullptr, column_reader.get()); - std::shared_ptr out; - ASSERT_OK(column_reader->NextBatch(100, &out)); - ASSERT_NE(nullptr, out.get()); - Int64Array* out_array = static_cast(out.get()); - for (size_t i = 0; i < values.size(); i++) { - EXPECT_EQ(values[i], out_array->raw_data()[i]); - } -} - -} // namespace parquet - -} // namespace arrow diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc index 481ded5789a..346de253606 100644 --- a/cpp/src/arrow/parquet/reader.cc +++ b/cpp/src/arrow/parquet/reader.cc @@ -26,6 +26,7 @@ #include "arrow/util/status.h" using parquet::ColumnReader; +using parquet::Repetition; using parquet::TypedColumnReader; namespace arrow { @@ -36,6 +37,7 @@ class FileReader::Impl { Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader); virtual ~Impl() {} + bool CheckForFlatColumn(const ::parquet::ColumnDescriptor* descr); Status GetFlatColumn(int i, std::unique_ptr* out); Status ReadFlatColumn(int i, std::shared_ptr* out); @@ -51,7 +53,7 @@ class FlatColumnReader::Impl { virtual ~Impl() {} Status NextBatch(int batch_size, std::shared_ptr* out); - template + template Status TypedReadBatch(int batch_size, std::shared_ptr* out); private: @@ -67,14 +69,28 @@ class FlatColumnReader::Impl { PoolBuffer values_buffer_; PoolBuffer def_levels_buffer_; - PoolBuffer rep_levels_buffer_; + PoolBuffer values_builder_buffer_; + PoolBuffer valid_bytes_buffer_; }; FileReader::Impl::Impl( MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader) : pool_(pool), reader_(std::move(reader)) {} +bool FileReader::Impl::CheckForFlatColumn(const ::parquet::ColumnDescriptor* descr) { + if ((descr->max_repetition_level() > 0) || (descr->max_definition_level() > 1)) { + return false; + } else if ((descr->max_definition_level() == 1) && + (descr->schema_node()->repetition() != Repetition::OPTIONAL)) { + return false; + } + return true; +} + Status FileReader::Impl::GetFlatColumn(int i, std::unique_ptr* out) { + if (!CheckForFlatColumn(reader_->descr()->Column(i))) { + return Status::Invalid("The requested column is not flat"); + } std::unique_ptr impl( new FlatColumnReader::Impl(pool_, reader_->descr()->Column(i), reader_.get(), i)); *out = std::unique_ptr(new FlatColumnReader(std::move(impl))); @@ -109,37 +125,50 @@ FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor column_index_(column_index), next_row_group_(0), values_buffer_(pool), - def_levels_buffer_(pool), - rep_levels_buffer_(pool) { + def_levels_buffer_(pool) { NodeToField(descr_->schema_node(), &field_); NextRowGroup(); } -template +template Status FlatColumnReader::Impl::TypedReadBatch( int batch_size, std::shared_ptr* out) { int values_to_read = batch_size; NumericBuilder builder(pool_, field_->type); while ((values_to_read > 0) && column_reader_) { - values_buffer_.Resize(values_to_read * sizeof(CType)); + values_buffer_.Resize(values_to_read * sizeof(typename ParquetType::c_type)); if (descr_->max_definition_level() > 0) { def_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); } - if (descr_->max_repetition_level() > 0) { - rep_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); - } auto reader = dynamic_cast*>(column_reader_.get()); int64_t values_read; - CType* values = reinterpret_cast(values_buffer_.mutable_data()); - PARQUET_CATCH_NOT_OK( - values_to_read -= reader->ReadBatch(values_to_read, - reinterpret_cast(def_levels_buffer_.mutable_data()), - reinterpret_cast(rep_levels_buffer_.mutable_data()), values, - &values_read)); + int64_t levels_read; + int16_t* def_levels = reinterpret_cast(def_levels_buffer_.mutable_data()); + auto values = + reinterpret_cast(values_buffer_.mutable_data()); + PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch( + values_to_read, def_levels, nullptr, values, &values_read)); + values_to_read -= levels_read; if (descr_->max_definition_level() == 0) { RETURN_NOT_OK(builder.Append(values, values_read)); } else { - return Status::NotImplemented("no support for definition levels yet"); + // descr_->max_definition_level() == 1 + RETURN_NOT_OK(values_builder_buffer_.Resize( + levels_read * sizeof(typename ParquetType::c_type))); + RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t))); + auto values_ptr = reinterpret_cast( + values_builder_buffer_.mutable_data()); + uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data(); + int values_idx = 0; + for (int64_t i = 0; i < levels_read; i++) { + if (def_levels[i] < descr_->max_definition_level()) { + valid_bytes[i] = 0; + } else { + valid_bytes[i] = 1; + values_ptr[i] = values[values_idx++]; + } + } + builder.Append(values_ptr, levels_read, valid_bytes); } if (!column_reader_->HasNext()) { NextRowGroup(); } } @@ -147,9 +176,9 @@ Status FlatColumnReader::Impl::TypedReadBatch( return Status::OK(); } -#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType, CType) \ - case Type::ENUM: \ - return TypedReadBatch(batch_size, out); \ +#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \ + case Type::ENUM: \ + return TypedReadBatch(batch_size, out); \ break; Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out) { @@ -159,15 +188,11 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* return Status::OK(); } - if (descr_->max_repetition_level() > 0) { - return Status::NotImplemented("no support for repetition yet"); - } - switch (field_->type->type) { - TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type, int32_t) - TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type, int64_t) - TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType, float) - TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType, double) + TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type) + TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType) + TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType) default: return Status::NotImplemented(field_->type->ToString()); } diff --git a/cpp/src/arrow/parquet/writer.cc b/cpp/src/arrow/parquet/writer.cc new file mode 100644 index 00000000000..3ad2c5b0735 --- /dev/null +++ b/cpp/src/arrow/parquet/writer.cc @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/parquet/writer.h" + +#include "arrow/array.h" +#include "arrow/types/primitive.h" +#include "arrow/parquet/utils.h" +#include "arrow/util/status.h" + +namespace arrow { + +namespace parquet { + +class FileWriter::Impl { + public: + Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer); + + Status NewRowGroup(int64_t chunk_size); + template + Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data); + Status WriteFlatColumnChunk(const PrimitiveArray* data); + Status Close(); + + virtual ~Impl() {} + + private: + MemoryPool* pool_; + PoolBuffer data_buffer_; + PoolBuffer def_levels_buffer_; + std::unique_ptr<::parquet::ParquetFileWriter> writer_; + ::parquet::RowGroupWriter* row_group_writer_; +}; + +FileWriter::Impl::Impl( + MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer) + : pool_(pool), + data_buffer_(pool), + writer_(std::move(writer)), + row_group_writer_(nullptr) {} + +Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) { + if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); } + PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup(chunk_size)); + return Status::OK(); +} + +template +Status FileWriter::Impl::TypedWriteBatch( + ::parquet::ColumnWriter* column_writer, const PrimitiveArray* data) { + auto data_ptr = + reinterpret_cast(data->data()->data()); + auto writer = + reinterpret_cast<::parquet::TypedColumnWriter*>(column_writer); + if (writer->descr()->max_definition_level() == 0) { + // no nulls, just dump the data + PARQUET_CATCH_NOT_OK(writer->WriteBatch(data->length(), nullptr, nullptr, data_ptr)); + } else if (writer->descr()->max_definition_level() == 1) { + RETURN_NOT_OK(def_levels_buffer_.Resize(data->length() * sizeof(int16_t))); + int16_t* def_levels_ptr = + reinterpret_cast(def_levels_buffer_.mutable_data()); + if (data->null_count() == 0) { + std::fill(def_levels_ptr, def_levels_ptr + data->length(), 1); + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(data->length(), def_levels_ptr, nullptr, data_ptr)); + } else { + RETURN_NOT_OK(data_buffer_.Resize( + (data->length() - data->null_count()) * sizeof(typename ParquetType::c_type))); + auto buffer_ptr = + reinterpret_cast(data_buffer_.mutable_data()); + int buffer_idx = 0; + for (size_t i = 0; i < data->length(); i++) { + if (data->IsNull(i)) { + def_levels_ptr[i] = 0; + } else { + def_levels_ptr[i] = 1; + buffer_ptr[buffer_idx++] = data_ptr[i]; + } + } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(data->length(), def_levels_ptr, nullptr, buffer_ptr)); + } + } else { + return Status::NotImplemented("no support for max definition level > 1 yet"); + } + PARQUET_CATCH_NOT_OK(writer->Close()); + return Status::OK(); +} + +Status FileWriter::Impl::Close() { + if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); } + PARQUET_CATCH_NOT_OK(writer_->Close()); + return Status::OK(); +} + +#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \ + case Type::ENUM: \ + return TypedWriteBatch(writer, data); \ + break; + +Status FileWriter::Impl::WriteFlatColumnChunk(const PrimitiveArray* data) { + ::parquet::ColumnWriter* writer; + PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn()); + switch (data->type_enum()) { + TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type) + TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType) + TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType) + default: + return Status::NotImplemented(data->type()->ToString()); + } +} + +FileWriter::FileWriter( + MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer) + : impl_(new FileWriter::Impl(pool, std::move(writer))) {} + +Status FileWriter::NewRowGroup(int64_t chunk_size) { + return impl_->NewRowGroup(chunk_size); +} + +Status FileWriter::WriteFlatColumnChunk(const PrimitiveArray* data) { + return impl_->WriteFlatColumnChunk(data); +} + +Status FileWriter::Close() { + return impl_->Close(); +} + +FileWriter::~FileWriter() {} + +} // namespace parquet + +} // namespace arrow diff --git a/cpp/src/arrow/parquet/writer.h b/cpp/src/arrow/parquet/writer.h new file mode 100644 index 00000000000..38f7d0b3a89 --- /dev/null +++ b/cpp/src/arrow/parquet/writer.h @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_PARQUET_WRITER_H +#define ARROW_PARQUET_WRITER_H + +#include + +#include "parquet/api/schema.h" +#include "parquet/api/writer.h" + +namespace arrow { + +class MemoryPool; +class PrimitiveArray; +class RowBatch; +class Status; + +namespace parquet { + +/** + * Iterative API: + * Start a new RowGroup/Chunk with NewRowGroup + * Write column-by-column the whole column chunk + */ +class FileWriter { + public: + FileWriter(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer); + + Status NewRowGroup(int64_t chunk_size); + Status WriteFlatColumnChunk(const PrimitiveArray* data); + Status Close(); + + virtual ~FileWriter(); + + private: + class Impl; + std::unique_ptr impl_; +}; + +} // namespace parquet + +} // namespace arrow + +#endif // ARROW_PARQUET_WRITER_H