From 142a3645491409eb6d51283fb10ab72d8f8cbe04 Mon Sep 17 00:00:00 2001 From: "Korn, Uwe" Date: Tue, 13 Sep 2016 08:47:36 +0200 Subject: [PATCH 01/10] PARQUET-712: Add library to read into Arrow memory --- CMakeLists.txt | 35 ++ cmake_modules/FindArrow.cmake | 87 +++ src/parquet/arrow/CMakeLists.txt | 68 +++ src/parquet/arrow/arrow-io-test.cc | 185 +++++++ src/parquet/arrow/arrow-reader-writer-test.cc | 508 ++++++++++++++++++ src/parquet/arrow/arrow-schema-test.cc | 264 +++++++++ src/parquet/arrow/io.cc | 108 ++++ src/parquet/arrow/io.h | 88 +++ src/parquet/arrow/reader.cc | 410 ++++++++++++++ src/parquet/arrow/reader.h | 149 +++++ src/parquet/arrow/schema.cc | 351 ++++++++++++ src/parquet/arrow/schema.h | 56 ++ src/parquet/arrow/test-util.h | 193 +++++++ src/parquet/arrow/utils.h | 52 ++ src/parquet/arrow/writer.cc | 373 +++++++++++++ src/parquet/arrow/writer.h | 78 +++ thirdparty/build_thirdparty.sh | 10 + thirdparty/download_thirdparty.sh | 5 + thirdparty/set_thirdparty_env.sh | 1 + thirdparty/versions.sh | 4 + 20 files changed, 3025 insertions(+) create mode 100644 cmake_modules/FindArrow.cmake create mode 100644 src/parquet/arrow/CMakeLists.txt create mode 100644 src/parquet/arrow/arrow-io-test.cc create mode 100644 src/parquet/arrow/arrow-reader-writer-test.cc create mode 100644 src/parquet/arrow/arrow-schema-test.cc create mode 100644 src/parquet/arrow/io.cc create mode 100644 src/parquet/arrow/io.h create mode 100644 src/parquet/arrow/reader.cc create mode 100644 src/parquet/arrow/reader.h create mode 100644 src/parquet/arrow/schema.cc create mode 100644 src/parquet/arrow/schema.h create mode 100644 src/parquet/arrow/test-util.h create mode 100644 src/parquet/arrow/utils.h create mode 100644 src/parquet/arrow/writer.cc create mode 100644 src/parquet/arrow/writer.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 3878056e..d085da95 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -82,6 +82,9 @@ if ("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") option(PARQUET_BUILD_EXECUTABLES "Build the libparquet executable CLI tools" ON) + option(PARQUET_ARROW + "Build the Arrow support" + OFF) endif() # If build in-source, create the latest symlink. If build out-of-source, which is @@ -247,6 +250,16 @@ function(ADD_PARQUET_TEST_DEPENDENCIES REL_TEST_NAME) add_dependencies(${TEST_NAME} ${ARGN}) endfunction() +# A wrapper for add_dependencies() that is compatible with PARQUET_BUILD_TESTS. +function(ADD_PARQUET_LINK_LIBRARIES REL_TEST_NAME) + if(NOT PARQUET_BUILD_TESTS) + return() + endif() + get_filename_component(TEST_NAME ${REL_TEST_NAME} NAME_WE) + + target_link_libraries(${TEST_NAME} ${ARGN}) +endfunction() + enable_testing() ############################################################ @@ -553,6 +566,12 @@ if (PARQUET_BUILD_SHARED) target_link_libraries(parquet_shared LINK_PUBLIC ${LIBPARQUET_LINK_LIBS} LINK_PRIVATE ${LIBPARQUET_PRIVATE_LINK_LIBS}) + if (APPLE) + set_target_properties(parquet_shared + PROPERTIES + BUILD_WITH_INSTALL_RPATH ON + INSTALL_NAME_DIR "@rpath") + endif() endif() if (PARQUET_BUILD_STATIC) @@ -583,6 +602,22 @@ add_dependencies(parquet_objlib parquet_thrift) add_subdirectory(benchmarks) add_subdirectory(tools) +# Arrow +if (PARQUET_ARROW) + find_package(Arrow REQUIRED) + include_directories(SYSTEM ${ARROW_INCLUDE_DIR}) + add_library(arrow SHARED IMPORTED) + set_target_properties(arrow PROPERTIES IMPORTED_LOCATION ${ARROW_SHARED_LIB}) + add_library(arrow_io SHARED IMPORTED) + set_target_properties(arrow_io PROPERTIES IMPORTED_LOCATION ${ARROW_IO_SHARED_LIB}) + #add_library(arrow_static STAIC IMPORTED) + #set_target_properties(arrow_static PROPERTIES IMPORTED_LOCATION ${ARROW_STATIC_LIB}) + #add_library(arrow_io_static STATIC IMPORTED) + #set_target_properties(arrow_io_static PROPERTIES IMPORTED_LOCATION ${ARROW_IO_STATIC_LIB}) + + add_subdirectory(src/parquet/arrow) +endif() + add_custom_target(clean-all COMMAND ${CMAKE_BUILD_TOOL} clean COMMAND ${CMAKE_COMMAND} -P cmake_modules/clean-all.cmake diff --git a/cmake_modules/FindArrow.cmake b/cmake_modules/FindArrow.cmake new file mode 100644 index 00000000..91d0e714 --- /dev/null +++ b/cmake_modules/FindArrow.cmake @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# - Find ARROW (arrow/api.h, libarrow.a, libarrow.so) +# This module defines +# ARROW_INCLUDE_DIR, directory containing headers +# ARROW_LIBS, directory containing arrow libraries +# ARROW_STATIC_LIB, path to libarrow.a +# ARROW_SHARED_LIB, path to libarrow's shared library +# ARROW_FOUND, whether arrow has been found + +set(ARROW_SEARCH_HEADER_PATHS + $ENV{ARROW_HOME}/include +) + +set(ARROW_SEARCH_LIB_PATH + $ENV{ARROW_HOME}/lib +) + +find_path(ARROW_INCLUDE_DIR arrow/array.h PATHS + ${ARROW_SEARCH_HEADER_PATHS} + # make sure we don't accidentally pick up a different version + NO_DEFAULT_PATH +) + +find_library(ARROW_LIB_PATH NAMES arrow + PATHS + ${ARROW_SEARCH_LIB_PATH} + NO_DEFAULT_PATH) + +find_library(ARROW_IO_LIB_PATH NAMES arrow_io + PATHS + ${ARROW_SEARCH_LIB_PATH} + NO_DEFAULT_PATH) + +if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH) + set(ARROW_FOUND TRUE) + set(ARROW_LIB_NAME libarrow) + set(ARROW_IO_LIB_NAME libarrow_io) + + set(ARROW_LIBS ${ARROW_SEARCH_LIB_PATH}) + set(ARROW_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_LIB_NAME}.a) + set(ARROW_SHARED_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + + set(ARROW_IO_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_IO_LIB_NAME}.a) + set(ARROW_IO_SHARED_LIB ${ARROW_LIBS}/${ARROW_IO_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + if (NOT Arrow_FIND_QUIETLY) + message(STATUS "Found the Arrow core library: ${ARROW_LIB_PATH}") + message(STATUS "Found the Arrow IO library: ${ARROW_IO_LIB_PATH}") + endif () +else () + if (NOT Arrow_FIND_QUIETLY) + set(ARROW_ERR_MSG "Could not find the Arrow library. Looked for headers") + set(ARROW_ERR_MSG "${ARROW_ERR_MSG} in ${ARROW_SEARCH_HEADER_PATHS}, and for libs") + set(ARROW_ERR_MSG "${ARROW_ERR_MSG} in ${ARROW_SEARCH_LIB_PATH}") + if (Arrow_FIND_REQUIRED) + message(FATAL_ERROR "${ARROW_ERR_MSG}") + else (Arrow_FIND_REQUIRED) + message(STATUS "${ARROW_ERR_MSG}") + endif (Arrow_FIND_REQUIRED) + endif () + set(ARROW_FOUND FALSE) +endif () + +mark_as_advanced( + ARROW_FOUND + ARROW_INCLUDE_DIR + ARROW_LIBS + ARROW_STATIC_LIB + ARROW_SHARED_LIB + ARROW_IO_STATIC_LIB + ARROW_IO_SHARED_LIB +) diff --git a/src/parquet/arrow/CMakeLists.txt b/src/parquet/arrow/CMakeLists.txt new file mode 100644 index 00000000..3733d031 --- /dev/null +++ b/src/parquet/arrow/CMakeLists.txt @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# ---------------------------------------------------------------------- +# parquet_arrow : Arrow <-> Parquet adapter + +set(PARQUET_ARROW_SRCS + io.cc + reader.cc + schema.cc + writer.cc +) + +set(PARQUET_ARROW_LIBS + arrow + arrow_io + parquet_shared +) + +add_library(parquet_arrow SHARED + ${PARQUET_ARROW_SRCS} +) +target_link_libraries(parquet_arrow ${PARQUET_ARROW_LIBS}) +SET_TARGET_PROPERTIES(parquet_arrow PROPERTIES LINKER_LANGUAGE CXX) + +if (APPLE) + set_target_properties(parquet_arrow + PROPERTIES + BUILD_WITH_INSTALL_RPATH ON + INSTALL_NAME_DIR "@rpath") +endif() + +ADD_PARQUET_TEST(arrow-schema-test) +ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow arrow) + +ADD_PARQUET_TEST(arrow-io-test) +ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow) + +ADD_PARQUET_TEST(arrow-reader-writer-test) +ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow) + +# Headers: top level +install(FILES + io.h + reader.h + schema.h + utils.h + writer.h + DESTINATION include/parquet/arrow) + +install(TARGETS parquet_arrow + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib) + diff --git a/src/parquet/arrow/arrow-io-test.cc b/src/parquet/arrow/arrow-io-test.cc new file mode 100644 index 00000000..83a8b9bb --- /dev/null +++ b/src/parquet/arrow/arrow-io-test.cc @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include "gtest/gtest.h" + +#include "arrow/test-util.h" +#include "arrow/util/memory-pool.h" +#include "arrow/util/status.h" + +#include "parquet/api/io.h" +#include "parquet/arrow/io.h" + +using arrow::MemoryPool; +using arrow::Status; + +namespace parquet { +namespace arrow { + +// Allocator tests + +TEST(TestParquetAllocator, DefaultCtor) { + ParquetAllocator allocator; + + const int buffer_size = 10; + + uint8_t* buffer = nullptr; + ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size);); + + // valgrind will complain if we write into nullptr + memset(buffer, 0, buffer_size); + + allocator.Free(buffer, buffer_size); +} + +// Pass through to the default memory pool +class TrackingPool : public MemoryPool { + public: + TrackingPool() : pool_(::arrow::default_memory_pool()), bytes_allocated_(0) {} + + Status Allocate(int64_t size, uint8_t** out) override { + RETURN_NOT_OK(pool_->Allocate(size, out)); + bytes_allocated_ += size; + return Status::OK(); + } + + void Free(uint8_t* buffer, int64_t size) override { + pool_->Free(buffer, size); + bytes_allocated_ -= size; + } + + int64_t bytes_allocated() const override { return bytes_allocated_; } + + private: + MemoryPool* pool_; + int64_t bytes_allocated_; +}; + +TEST(TestParquetAllocator, CustomPool) { + TrackingPool pool; + + ParquetAllocator allocator(&pool); + + ASSERT_EQ(&pool, allocator.pool()); + + const int buffer_size = 10; + + uint8_t* buffer = nullptr; + ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size);); + + ASSERT_EQ(buffer_size, pool.bytes_allocated()); + + // valgrind will complain if we write into nullptr + memset(buffer, 0, buffer_size); + + allocator.Free(buffer, buffer_size); + + ASSERT_EQ(0, pool.bytes_allocated()); +} + +// ---------------------------------------------------------------------- +// Read source tests + +class BufferReader : public ::arrow::io::RandomAccessFile { + public: + BufferReader(const uint8_t* buffer, int buffer_size) + : buffer_(buffer), buffer_size_(buffer_size), position_(0) {} + + Status Close() override { + // no-op + return Status::OK(); + } + + Status Tell(int64_t* position) override { + *position = position_; + return Status::OK(); + } + + Status ReadAt( + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override { + RETURN_NOT_OK(Seek(position)); + return Read(nbytes, bytes_read, buffer); + } + + Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override { + memcpy(buffer, buffer_ + position_, nbytes); + *bytes_read = std::min(nbytes, buffer_size_ - position_); + position_ += *bytes_read; + return Status::OK(); + } + + Status GetSize(int64_t* size) override { + *size = buffer_size_; + return Status::OK(); + } + + Status Seek(int64_t position) override { + if (position < 0 || position >= buffer_size_) { + return Status::IOError("position out of bounds"); + } + + position_ = position; + return Status::OK(); + } + + private: + const uint8_t* buffer_; + int buffer_size_; + int64_t position_; +}; + +TEST(TestParquetReadSource, Basics) { + std::string data = "this is the data"; + auto data_buffer = reinterpret_cast(data.c_str()); + + ParquetAllocator allocator(::arrow::default_memory_pool()); + + auto file = std::make_shared(data_buffer, data.size()); + auto source = std::make_shared(&allocator); + + ASSERT_OK(source->Open(file)); + + ASSERT_EQ(0, source->Tell()); + ASSERT_NO_THROW(source->Seek(5)); + ASSERT_EQ(5, source->Tell()); + ASSERT_NO_THROW(source->Seek(0)); + + // Seek out of bounds + ASSERT_THROW(source->Seek(100), ::parquet::ParquetException); + + uint8_t buffer[50]; + + ASSERT_NO_THROW(source->Read(4, buffer)); + ASSERT_EQ(0, std::memcmp(buffer, "this", 4)); + ASSERT_EQ(4, source->Tell()); + + std::shared_ptr<::parquet::Buffer> pq_buffer; + + ASSERT_NO_THROW(pq_buffer = source->Read(7)); + + auto expected_buffer = std::make_shared<::parquet::Buffer>(data_buffer + 4, 7); + + ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get())); +} + +} // namespace arrow +} // namespace parquet diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc new file mode 100644 index 00000000..8faab43c --- /dev/null +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -0,0 +1,508 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gtest/gtest.h" + +#include "parquet/api/reader.h" +#include "parquet/api/writer.h" + +#include "parquet/arrow/test-util.h" +#include "parquet/arrow/reader.h" +#include "parquet/arrow/writer.h" + +#include "arrow/test-util.h" +#include "arrow/types/construct.h" +#include "arrow/types/primitive.h" +#include "arrow/types/string.h" +#include "arrow/util/memory-pool.h" +#include "arrow/util/status.h" + +using arrow::Array; +using arrow::ChunkedArray; +using arrow::default_memory_pool; +using arrow::PoolBuffer; +using arrow::PrimitiveArray; +using arrow::Status; +using arrow::Table; + +using ParquetBuffer = parquet::Buffer; +using parquet::BufferReader; +using parquet::default_writer_properties; +using parquet::InMemoryOutputStream; +using parquet::LogicalType; +using parquet::ParquetFileReader; +using parquet::ParquetFileWriter; +using parquet::RandomAccessSource; +using parquet::Repetition; +using parquet::SchemaDescriptor; +using parquet::ParquetVersion; +using ParquetType = parquet::Type; +using parquet::schema::GroupNode; +using parquet::schema::NodePtr; +using parquet::schema::PrimitiveNode; + +namespace parquet { + +namespace arrow { + +const int SMALL_SIZE = 100; +const int LARGE_SIZE = 10000; + +template +struct test_traits {}; + +template <> +struct test_traits<::arrow::BooleanType> { + static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static uint8_t const value; +}; + +const uint8_t test_traits<::arrow::BooleanType>::value(1); + +template <> +struct test_traits<::arrow::UInt8Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_8; + static uint8_t const value; +}; + +const uint8_t test_traits<::arrow::UInt8Type>::value(64); + +template <> +struct test_traits<::arrow::Int8Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::INT_8; + static int8_t const value; +}; + +const int8_t test_traits<::arrow::Int8Type>::value(-64); + +template <> +struct test_traits<::arrow::UInt16Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_16; + static uint16_t const value; +}; + +const uint16_t test_traits<::arrow::UInt16Type>::value(1024); + +template <> +struct test_traits<::arrow::Int16Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::INT_16; + static int16_t const value; +}; + +const int16_t test_traits<::arrow::Int16Type>::value(-1024); + +template <> +struct test_traits<::arrow::UInt32Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_32; + static uint32_t const value; +}; + +const uint32_t test_traits<::arrow::UInt32Type>::value(1024); + +template <> +struct test_traits<::arrow::Int32Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static int32_t const value; +}; + +const int32_t test_traits<::arrow::Int32Type>::value(-1024); + +template <> +struct test_traits<::arrow::UInt64Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT64; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_64; + static uint64_t const value; +}; + +const uint64_t test_traits<::arrow::UInt64Type>::value(1024); + +template <> +struct test_traits<::arrow::Int64Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT64; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static int64_t const value; +}; + +const int64_t test_traits<::arrow::Int64Type>::value(-1024); + +template <> +struct test_traits<::arrow::TimestampType> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT64; + static constexpr LogicalType::type logical_enum = LogicalType::TIMESTAMP_MILLIS; + static int64_t const value; +}; + +const int64_t test_traits<::arrow::TimestampType>::value(14695634030000); + +template <> +struct test_traits<::arrow::FloatType> { + static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static float const value; +}; + +const float test_traits<::arrow::FloatType>::value(2.1f); + +template <> +struct test_traits<::arrow::DoubleType> { + static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static double const value; +}; + +const double test_traits<::arrow::DoubleType>::value(4.2); + +template <> +struct test_traits<::arrow::StringType> { + static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; + static constexpr LogicalType::type logical_enum = LogicalType::UTF8; + static std::string const value; +}; + +const std::string test_traits<::arrow::StringType>::value("Test"); + +template +using ParquetDataType = DataType::parquet_enum>; + +template +using ParquetWriter = TypedColumnWriter>; + +template +class TestParquetIO : public ::testing::Test { + public: + virtual void SetUp() {} + + std::shared_ptr MakeSchema(Repetition::type repetition) { + auto pnode = PrimitiveNode::Make("column1", repetition, + test_traits::parquet_enum, test_traits::logical_enum); + NodePtr node_ = + GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); + return std::static_pointer_cast(node_); + } + + std::unique_ptr MakeWriter( + const std::shared_ptr& schema) { + sink_ = std::make_shared(); + return ParquetFileWriter::Open(sink_, schema); + } + + std::unique_ptr ReaderFromSink() { + std::shared_ptr buffer = sink_->GetBuffer(); + std::unique_ptr source(new BufferReader(buffer)); + return ParquetFileReader::Open(std::move(source)); + } + + void ReadSingleColumnFile( + std::unique_ptr file_reader, std::shared_ptr* out) { + FileReader reader(::arrow::default_memory_pool(), std::move(file_reader)); + std::unique_ptr column_reader; + ASSERT_OK_NO_THROW(reader.GetFlatColumn(0, &column_reader)); + ASSERT_NE(nullptr, column_reader.get()); + + ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out)); + ASSERT_NE(nullptr, out->get()); + } + + void ReadAndCheckSingleColumnFile(::arrow::Array* values) { + std::shared_ptr<::arrow::Array> out; + ReadSingleColumnFile(ReaderFromSink(), &out); + ASSERT_TRUE(values->Equals(out)); + } + + void ReadTableFromFile( + std::unique_ptr file_reader, std::shared_ptr* out) { + FileReader reader(::arrow::default_memory_pool(), std::move(file_reader)); + ASSERT_OK_NO_THROW(reader.ReadFlatTable(out)); + ASSERT_NE(nullptr, out->get()); + } + + void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) { + std::shared_ptr<::arrow::Table> out; + ReadTableFromFile(ReaderFromSink(), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(values->length(), out->num_rows()); + + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); + } + + template + void WriteFlatColumn(const std::shared_ptr& schema, + const std::shared_ptr& values) { + FileWriter writer(::arrow::default_memory_pool(), MakeWriter(schema)); + ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length())); + ASSERT_OK_NO_THROW(writer.WriteFlatColumnChunk(values.get())); + ASSERT_OK_NO_THROW(writer.Close()); + } + + std::shared_ptr sink_; +}; + +// We habe separate tests for UInt32Type as this is currently the only type +// where a roundtrip does not yield the identical Array structure. +// There we write an UInt32 Array but receive an Int64 Array as result for +// Parquet version 1.0. + +typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, ::arrow::Int16Type, + ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType, + ::arrow::StringType> TestTypes; + +TYPED_TEST_CASE(TestParquetIO, TestTypes); + +TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) { + auto values = NonNullArray(SMALL_SIZE); + + std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); + this->WriteFlatColumn(schema, values); + + this->ReadAndCheckSingleColumnFile(values.get()); +} + +TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { + auto values = NonNullArray(SMALL_SIZE); + std::shared_ptr
table = MakeSimpleTable(values, false); + this->sink_ = std::make_shared(); + ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(), this->sink_, + values->length(), default_writer_properties())); + + std::shared_ptr
out; + this->ReadTableFromFile(this->ReaderFromSink(), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(100, out->num_rows()); + + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); +} + +TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { + // This also tests max_definition_level = 1 + auto values = NullableArray(SMALL_SIZE, 10); + + std::shared_ptr schema = this->MakeSchema(Repetition::OPTIONAL); + this->WriteFlatColumn(schema, values); + + this->ReadAndCheckSingleColumnFile(values.get()); +} + +TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { + // This also tests max_definition_level = 1 + std::shared_ptr values = NullableArray(SMALL_SIZE, 10); + std::shared_ptr
table = MakeSimpleTable(values, true); + this->sink_ = std::make_shared(); + ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(), this->sink_, + values->length(), default_writer_properties())); + + this->ReadAndCheckSingleColumnTable(values); +} + +TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { + auto values = NonNullArray(SMALL_SIZE); + int64_t chunk_size = values->length() / 4; + + std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); + FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); + for (int i = 0; i < 4; i++) { + ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); + ASSERT_OK_NO_THROW( + writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size)); + } + ASSERT_OK_NO_THROW(writer.Close()); + + this->ReadAndCheckSingleColumnFile(values.get()); +} + +TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) { + auto values = NonNullArray(LARGE_SIZE); + std::shared_ptr
table = MakeSimpleTable(values, false); + this->sink_ = std::make_shared(); + ASSERT_OK_NO_THROW(WriteFlatTable( + table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties())); + + this->ReadAndCheckSingleColumnTable(values); +} + +TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { + int64_t chunk_size = SMALL_SIZE / 4; + auto values = NullableArray(SMALL_SIZE, 10); + + std::shared_ptr schema = this->MakeSchema(Repetition::OPTIONAL); + FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema)); + for (int i = 0; i < 4; i++) { + ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); + ASSERT_OK_NO_THROW( + writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size)); + } + ASSERT_OK_NO_THROW(writer.Close()); + + this->ReadAndCheckSingleColumnFile(values.get()); +} + +TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { + // This also tests max_definition_level = 1 + auto values = NullableArray(LARGE_SIZE, 100); + std::shared_ptr
table = MakeSimpleTable(values, true); + this->sink_ = std::make_shared(); + ASSERT_OK_NO_THROW(WriteFlatTable( + table.get(), ::arrow::default_memory_pool(), this->sink_, 512, default_writer_properties())); + + this->ReadAndCheckSingleColumnTable(values); +} + +using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>; + +TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) { + // This also tests max_definition_level = 1 + std::shared_ptr values = NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100); + std::shared_ptr
table = MakeSimpleTable(values, true); + + // Parquet 2.0 roundtrip should yield an uint32_t column again + this->sink_ = std::make_shared(); + std::shared_ptr<::parquet::WriterProperties> properties = + ::parquet::WriterProperties::Builder() + .version(ParquetVersion::PARQUET_2_0) + ->build(); + ASSERT_OK_NO_THROW( + WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties)); + this->ReadAndCheckSingleColumnTable(values); +} + +TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) { + // This also tests max_definition_level = 1 + std::shared_ptr values = NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100); + std::shared_ptr
table = MakeSimpleTable(values, true); + + // Parquet 1.0 returns an int64_t column as there is no way to tell a Parquet 1.0 + // reader that a column is unsigned. + this->sink_ = std::make_shared(); + std::shared_ptr<::parquet::WriterProperties> properties = + ::parquet::WriterProperties::Builder() + .version(ParquetVersion::PARQUET_1_0) + ->build(); + ASSERT_OK_NO_THROW( + WriteFlatTable(table.get(), ::arrow::default_memory_pool(), this->sink_, 512, properties)); + + std::shared_ptr expected_values; + std::shared_ptr int64_data = + std::make_shared(::arrow::default_memory_pool()); + { + ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length())); + int64_t* int64_data_ptr = reinterpret_cast(int64_data->mutable_data()); + const uint32_t* uint32_data_ptr = + reinterpret_cast(values->data()->data()); + // std::copy might be faster but this is explicit on the casts) + for (int64_t i = 0; i < values->length(); i++) { + int64_data_ptr[i] = static_cast(uint32_data_ptr[i]); + } + } + ASSERT_OK(MakePrimitiveArray(std::make_shared<::arrow::Int64Type>(), values->length(), + int64_data, values->null_count(), values->null_bitmap(), &expected_values)); + this->ReadAndCheckSingleColumnTable(expected_values); +} + +template +using ParquetCDataType = typename ParquetDataType::c_type; + +template +class TestPrimitiveParquetIO : public TestParquetIO { + public: + typedef typename TestType::c_type T; + + void MakeTestFile(std::vector& values, int num_chunks, + std::unique_ptr* file_reader) { + std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); + std::unique_ptr file_writer = this->MakeWriter(schema); + size_t chunk_size = values.size() / num_chunks; + // Convert to Parquet's expected physical type + std::vector values_buffer( + sizeof(ParquetCDataType) * values.size()); + auto values_parquet = + reinterpret_cast*>(values_buffer.data()); + std::copy(values.cbegin(), values.cend(), values_parquet); + for (int i = 0; i < num_chunks; i++) { + auto row_group_writer = file_writer->AppendRowGroup(chunk_size); + auto column_writer = + static_cast*>(row_group_writer->NextColumn()); + ParquetCDataType* data = values_parquet + i * chunk_size; + column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); + column_writer->Close(); + row_group_writer->Close(); + } + file_writer->Close(); + *file_reader = this->ReaderFromSink(); + } + + void CheckSingleColumnRequiredTableRead(int num_chunks) { + std::vector values(SMALL_SIZE, test_traits::value); + std::unique_ptr file_reader; + ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader)); + + std::shared_ptr
out; + this->ReadTableFromFile(std::move(file_reader), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(SMALL_SIZE, out->num_rows()); + + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ExpectArray(values.data(), chunked_array->chunk(0).get()); + } + + void CheckSingleColumnRequiredRead(int num_chunks) { + std::vector values(SMALL_SIZE, test_traits::value); + std::unique_ptr file_reader; + ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader)); + + std::shared_ptr out; + this->ReadSingleColumnFile(std::move(file_reader), &out); + + ExpectArray(values.data(), out.get()); + } +}; + +typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, ::arrow::Int16Type, + ::arrow::UInt32Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType, + ::arrow::DoubleType> PrimitiveTestTypes; + +TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes); + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredRead) { + this->CheckSingleColumnRequiredRead(1); +} + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredTableRead) { + this->CheckSingleColumnRequiredTableRead(1); +} + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedRead) { + this->CheckSingleColumnRequiredRead(4); +} + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) { + this->CheckSingleColumnRequiredTableRead(4); +} + +} // namespace arrow + +} // namespace parquet diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc new file mode 100644 index 00000000..31a47223 --- /dev/null +++ b/src/parquet/arrow/arrow-schema-test.cc @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "gtest/gtest.h" + +#include "parquet/arrow/schema.h" + +#include "arrow/test-util.h" +#include "arrow/type.h" +#include "arrow/types/datetime.h" +#include "arrow/types/decimal.h" +#include "arrow/util/status.h" + +using arrow::Field; + +using ParquetType = parquet::Type; +using parquet::LogicalType; +using parquet::Repetition; +using parquet::schema::NodePtr; +using parquet::schema::GroupNode; +using parquet::schema::PrimitiveNode; + +namespace parquet { + +namespace arrow { + +const auto BOOL = std::make_shared<::arrow::BooleanType>(); +const auto UINT8 = std::make_shared<::arrow::UInt8Type>(); +const auto INT32 = std::make_shared<::arrow::Int32Type>(); +const auto INT64 = std::make_shared<::arrow::Int64Type>(); +const auto FLOAT = std::make_shared<::arrow::FloatType>(); +const auto DOUBLE = std::make_shared<::arrow::DoubleType>(); +const auto UTF8 = std::make_shared<::arrow::StringType>(); +const auto TIMESTAMP_MS = std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI); +// TODO: This requires parquet-cpp implementing the MICROS enum value +// const auto TIMESTAMP_US = std::make_shared(TimestampType::Unit::MICRO); +const auto BINARY = std::make_shared<::arrow::ListType>(std::make_shared("", UINT8)); +const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4); + +class TestConvertParquetSchema : public ::testing::Test { + public: + virtual void SetUp() {} + + void CheckFlatSchema(const std::shared_ptr<::arrow::Schema>& expected_schema) { + ASSERT_EQ(expected_schema->num_fields(), result_schema_->num_fields()); + for (int i = 0; i < expected_schema->num_fields(); ++i) { + auto lhs = result_schema_->field(i); + auto rhs = expected_schema->field(i); + EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString() + << " != " << rhs->ToString(); + } + } + + ::arrow::Status ConvertSchema(const std::vector& nodes) { + NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes); + descr_.Init(schema); + return FromParquetSchema(&descr_, &result_schema_); + } + + protected: + SchemaDescriptor descr_; + std::shared_ptr<::arrow::Schema> result_schema_; +}; + +TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) { + std::vector parquet_fields; + std::vector> arrow_fields; + + parquet_fields.push_back( + PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN)); + arrow_fields.push_back(std::make_shared("boolean", BOOL, false)); + + parquet_fields.push_back( + PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32)); + arrow_fields.push_back(std::make_shared("int32", INT32, false)); + + parquet_fields.push_back( + PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64)); + arrow_fields.push_back(std::make_shared("int64", INT64, false)); + + parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, + ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS)); + arrow_fields.push_back(std::make_shared("timestamp", TIMESTAMP_MS, false)); + + // parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, + // ParquetType::INT64, LogicalType::TIMESTAMP_MICROS)); + // arrow_fields.push_back(std::make_shared("timestamp", TIMESTAMP_US, false)); + + parquet_fields.push_back( + PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT)); + arrow_fields.push_back(std::make_shared("float", FLOAT)); + + parquet_fields.push_back( + PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE)); + arrow_fields.push_back(std::make_shared("double", DOUBLE)); + + parquet_fields.push_back( + PrimitiveNode::Make("binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY)); + arrow_fields.push_back(std::make_shared("binary", BINARY)); + + parquet_fields.push_back(PrimitiveNode::Make( + "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8)); + arrow_fields.push_back(std::make_shared("string", UTF8)); + + parquet_fields.push_back(PrimitiveNode::Make("flba-binary", Repetition::OPTIONAL, + ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE, 12)); + arrow_fields.push_back(std::make_shared("flba-binary", BINARY)); + + auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); + ASSERT_OK(ConvertSchema(parquet_fields)); + + CheckFlatSchema(arrow_schema); +} + +TEST_F(TestConvertParquetSchema, ParquetFlatDecimals) { + std::vector parquet_fields; + std::vector> arrow_fields; + + parquet_fields.push_back(PrimitiveNode::Make("flba-decimal", Repetition::OPTIONAL, + ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 4, 8, 4)); + arrow_fields.push_back(std::make_shared("flba-decimal", DECIMAL_8_4)); + + parquet_fields.push_back(PrimitiveNode::Make("binary-decimal", Repetition::OPTIONAL, + ParquetType::BYTE_ARRAY, LogicalType::DECIMAL, -1, 8, 4)); + arrow_fields.push_back(std::make_shared("binary-decimal", DECIMAL_8_4)); + + parquet_fields.push_back(PrimitiveNode::Make("int32-decimal", Repetition::OPTIONAL, + ParquetType::INT32, LogicalType::DECIMAL, -1, 8, 4)); + arrow_fields.push_back(std::make_shared("int32-decimal", DECIMAL_8_4)); + + parquet_fields.push_back(PrimitiveNode::Make("int64-decimal", Repetition::OPTIONAL, + ParquetType::INT64, LogicalType::DECIMAL, -1, 8, 4)); + arrow_fields.push_back(std::make_shared("int64-decimal", DECIMAL_8_4)); + + auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); + ASSERT_OK(ConvertSchema(parquet_fields)); + + CheckFlatSchema(arrow_schema); +} + +TEST_F(TestConvertParquetSchema, UnsupportedThings) { + std::vector unsupported_nodes; + + unsupported_nodes.push_back( + PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96)); + + unsupported_nodes.push_back( + GroupNode::Make("repeated-group", Repetition::REPEATED, {})); + + unsupported_nodes.push_back(PrimitiveNode::Make( + "int32", Repetition::OPTIONAL, ParquetType::INT32, LogicalType::DATE)); + + for (const NodePtr& node : unsupported_nodes) { + ASSERT_RAISES(NotImplemented, ConvertSchema({node})); + } +} + +class TestConvertArrowSchema : public ::testing::Test { + public: + virtual void SetUp() {} + + void CheckFlatSchema(const std::vector& nodes) { + NodePtr schema_node = GroupNode::Make("schema", Repetition::REPEATED, nodes); + const GroupNode* expected_schema_node = + static_cast(schema_node.get()); + const GroupNode* result_schema_node = + result_schema_->group_node(); + + ASSERT_EQ(expected_schema_node->field_count(), result_schema_node->field_count()); + + for (int i = 0; i < expected_schema_node->field_count(); i++) { + auto lhs = result_schema_node->field(i); + auto rhs = expected_schema_node->field(i); + EXPECT_TRUE(lhs->Equals(rhs.get())); + } + } + + ::arrow::Status ConvertSchema(const std::vector>& fields) { + arrow_schema_ = std::make_shared<::arrow::Schema>(fields); + std::shared_ptr<::parquet::WriterProperties> properties = + ::parquet::default_writer_properties(); + return ToParquetSchema(arrow_schema_.get(), *properties.get(), &result_schema_); + } + + protected: + std::shared_ptr<::arrow::Schema> arrow_schema_; + std::shared_ptr result_schema_; +}; + +TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) { + std::vector parquet_fields; + std::vector> arrow_fields; + + parquet_fields.push_back( + PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN)); + arrow_fields.push_back(std::make_shared("boolean", BOOL, false)); + + parquet_fields.push_back( + PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32)); + arrow_fields.push_back(std::make_shared("int32", INT32, false)); + + parquet_fields.push_back( + PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64)); + arrow_fields.push_back(std::make_shared("int64", INT64, false)); + + parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, + ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS)); + arrow_fields.push_back(std::make_shared("timestamp", TIMESTAMP_MS, false)); + + // parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, + // ParquetType::INT64, LogicalType::TIMESTAMP_MICROS)); + // arrow_fields.push_back(std::make_shared("timestamp", TIMESTAMP_US, false)); + + parquet_fields.push_back( + PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT)); + arrow_fields.push_back(std::make_shared("float", FLOAT)); + + parquet_fields.push_back( + PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE)); + arrow_fields.push_back(std::make_shared("double", DOUBLE)); + + // TODO: String types need to be clarified a bit more in the Arrow spec + parquet_fields.push_back(PrimitiveNode::Make( + "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8)); + arrow_fields.push_back(std::make_shared("string", UTF8)); + + ASSERT_OK(ConvertSchema(arrow_fields)); + + CheckFlatSchema(parquet_fields); +} + +TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) { + std::vector parquet_fields; + std::vector> arrow_fields; + + // TODO: Test Decimal Arrow -> Parquet conversion + + ASSERT_OK(ConvertSchema(arrow_fields)); + + CheckFlatSchema(parquet_fields); +} + +TEST(TestNodeConversion, DateAndTime) {} + +} // namespace arrow + +} // namespace parquet diff --git a/src/parquet/arrow/io.cc b/src/parquet/arrow/io.cc new file mode 100644 index 00000000..3de8eb11 --- /dev/null +++ b/src/parquet/arrow/io.cc @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/arrow/io.h" + +#include +#include + +#include "parquet/api/io.h" +#include "parquet/arrow/utils.h" + +#include "arrow/util/memory-pool.h" +#include "arrow/util/status.h" + +using arrow::Status; +using arrow::MemoryPool; + +// To assist with readability +using ArrowROFile = arrow::io::RandomAccessFile; + +namespace parquet { +namespace arrow { + +// ---------------------------------------------------------------------- +// ParquetAllocator + +ParquetAllocator::ParquetAllocator() : pool_(::arrow::default_memory_pool()) {} + +ParquetAllocator::ParquetAllocator(MemoryPool* pool) : pool_(pool) {} + +ParquetAllocator::~ParquetAllocator() {} + +uint8_t* ParquetAllocator::Malloc(int64_t size) { + uint8_t* result; + PARQUET_THROW_NOT_OK(pool_->Allocate(size, &result)); + return result; +} + +void ParquetAllocator::Free(uint8_t* buffer, int64_t size) { + // Does not report Status + pool_->Free(buffer, size); +} + +// ---------------------------------------------------------------------- +// ParquetReadSource + +ParquetReadSource::ParquetReadSource(ParquetAllocator* allocator) + : file_(nullptr), allocator_(allocator) {} + +Status ParquetReadSource::Open(const std::shared_ptr& file) { + int64_t file_size; + RETURN_NOT_OK(file->GetSize(&file_size)); + + file_ = file; + size_ = file_size; + return Status::OK(); +} + +void ParquetReadSource::Close() { + // TODO(wesm): Make this a no-op for now. This leaves Python wrappers for + // these classes in a borked state. Probably better to explicitly close. + + // PARQUET_THROW_NOT_OK(file_->Close()); +} + +int64_t ParquetReadSource::Tell() const { + int64_t position; + PARQUET_THROW_NOT_OK(file_->Tell(&position)); + return position; +} + +void ParquetReadSource::Seek(int64_t position) { + PARQUET_THROW_NOT_OK(file_->Seek(position)); +} + +int64_t ParquetReadSource::Read(int64_t nbytes, uint8_t* out) { + int64_t bytes_read; + PARQUET_THROW_NOT_OK(file_->Read(nbytes, &bytes_read, out)); + return bytes_read; +} + +std::shared_ptr<::parquet::Buffer> ParquetReadSource::Read(int64_t nbytes) { + // TODO(wesm): This code is duplicated from parquet/util/input.cc; suggests + // that there should be more code sharing amongst file-like sources + auto result = std::make_shared<::parquet::OwnedMutableBuffer>(0, allocator_); + result->Resize(nbytes); + + int64_t bytes_read = Read(nbytes, result->mutable_data()); + if (bytes_read < nbytes) { result->Resize(bytes_read); } + return result; +} + +} // namespace arrow +} // namespace parquet diff --git a/src/parquet/arrow/io.h b/src/parquet/arrow/io.h new file mode 100644 index 00000000..feab7c53 --- /dev/null +++ b/src/parquet/arrow/io.h @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Bridges Arrow's IO interfaces and Parquet-cpp's IO interfaces + +#ifndef PARQUET_ARROW_IO_H +#define PARQUET_ARROW_IO_H + +#include +#include + +#include "parquet/api/io.h" + +#include "arrow/io/interfaces.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class MemoryPool; + +} // namespace arrow + +namespace parquet { + +namespace arrow { + +// An implementation of the Parquet MemoryAllocator API that plugs into an +// existing Arrow memory pool. This way we can direct all allocations to a +// single place rather than tracking allocations in different locations (for +// example: without utilizing parquet-cpp's default allocator) +class PARQUET_EXPORT ParquetAllocator : public MemoryAllocator { + public: + // Uses the default memory pool + ParquetAllocator(); + + explicit ParquetAllocator(::arrow::MemoryPool* pool); + virtual ~ParquetAllocator(); + + uint8_t* Malloc(int64_t size) override; + void Free(uint8_t* buffer, int64_t size) override; + + void set_pool(::arrow::MemoryPool* pool) { pool_ = pool; } + + ::arrow::MemoryPool* pool() const { return pool_; } + + private: + ::arrow::MemoryPool* pool_; +}; + +class PARQUET_EXPORT ParquetReadSource : public RandomAccessSource { + public: + explicit ParquetReadSource(ParquetAllocator* allocator); + + // We need to ask for the file size on opening the file, and this can fail + ::arrow::Status Open(const std::shared_ptr<::arrow::io::RandomAccessFile>& file); + + void Close() override; + int64_t Tell() const override; + void Seek(int64_t pos) override; + int64_t Read(int64_t nbytes, uint8_t* out) override; + std::shared_ptr Read(int64_t nbytes) override; + + private: + // An Arrow readable file of some kind + std::shared_ptr<::arrow::io::RandomAccessFile> file_; + + // The allocator is required for creating managed buffers + ParquetAllocator* allocator_; +}; + +} // namespace arrow +} // namespace parquet + +#endif // PARQUET_ARROW_IO_H diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc new file mode 100644 index 00000000..660938dd --- /dev/null +++ b/src/parquet/arrow/reader.cc @@ -0,0 +1,410 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/arrow/reader.h" + +#include +#include +#include +#include + +#include "parquet/arrow/io.h" +#include "parquet/arrow/schema.h" +#include "parquet/arrow/utils.h" + +#include "arrow/column.h" +#include "arrow/schema.h" +#include "arrow/table.h" +#include "arrow/types/primitive.h" +#include "arrow/types/string.h" +#include "arrow/util/status.h" + +using arrow::Array; +using arrow::Column; +using arrow::Field; +using arrow::MemoryPool; +using arrow::PoolBuffer; +using arrow::Status; +using arrow::Table; + +using parquet::ColumnReader; +using parquet::Repetition; +using parquet::TypedColumnReader; + +// Help reduce verbosity +using ParquetRAS = parquet::RandomAccessSource; +using ParquetReader = parquet::ParquetFileReader; + +namespace parquet { +namespace arrow { + +template +struct ArrowTypeTraits { + typedef ::arrow::NumericBuilder builder_type; +}; + +template <> +struct ArrowTypeTraits { + typedef ::arrow::BooleanBuilder builder_type; +}; + +template +using BuilderType = typename ArrowTypeTraits::builder_type; + +class FileReader::Impl { + public: + Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader); + virtual ~Impl() {} + + bool CheckForFlatColumn(const ::parquet::ColumnDescriptor* descr); + Status GetFlatColumn(int i, std::unique_ptr* out); + Status ReadFlatColumn(int i, std::shared_ptr* out); + Status ReadFlatTable(std::shared_ptr
* out); + + private: + MemoryPool* pool_; + std::unique_ptr<::parquet::ParquetFileReader> reader_; +}; + +class FlatColumnReader::Impl { + public: + Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr, + ::parquet::ParquetFileReader* reader, int column_index); + virtual ~Impl() {} + + Status NextBatch(int batch_size, std::shared_ptr* out); + template + Status TypedReadBatch(int batch_size, std::shared_ptr* out); + + template + Status ReadNullableFlatBatch(const int16_t* def_levels, + typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read, + BuilderType* builder); + template + Status ReadNonNullableBatch(typename ParquetType::c_type* values, int64_t values_read, + BuilderType* builder); + + private: + void NextRowGroup(); + + template + struct can_copy_ptr { + static constexpr bool value = + std::is_same::value || + (std::is_integral{} && std::is_integral{} && + (sizeof(InType) == sizeof(OutType))); + }; + + template ::value>::type* = nullptr> + Status ConvertPhysicalType( + const InType* in_ptr, int64_t length, const OutType** out_ptr) { + *out_ptr = reinterpret_cast(in_ptr); + return Status::OK(); + } + + template ::value>::type* = nullptr> + Status ConvertPhysicalType( + const InType* in_ptr, int64_t length, const OutType** out_ptr) { + RETURN_NOT_OK(values_builder_buffer_.Resize(length * sizeof(OutType))); + OutType* mutable_out_ptr = + reinterpret_cast(values_builder_buffer_.mutable_data()); + std::copy(in_ptr, in_ptr + length, mutable_out_ptr); + *out_ptr = mutable_out_ptr; + return Status::OK(); + } + + MemoryPool* pool_; + const ::parquet::ColumnDescriptor* descr_; + ::parquet::ParquetFileReader* reader_; + int column_index_; + int next_row_group_; + std::shared_ptr column_reader_; + std::shared_ptr field_; + + PoolBuffer values_buffer_; + PoolBuffer def_levels_buffer_; + PoolBuffer values_builder_buffer_; + PoolBuffer valid_bytes_buffer_; +}; + +FileReader::Impl::Impl( + MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader) + : pool_(pool), reader_(std::move(reader)) {} + +bool FileReader::Impl::CheckForFlatColumn(const ::parquet::ColumnDescriptor* descr) { + if ((descr->max_repetition_level() > 0) || (descr->max_definition_level() > 1)) { + return false; + } else if ((descr->max_definition_level() == 1) && + (descr->schema_node()->repetition() != Repetition::OPTIONAL)) { + return false; + } + return true; +} + +Status FileReader::Impl::GetFlatColumn(int i, std::unique_ptr* out) { + const SchemaDescriptor* schema = reader_->metadata()->schema(); + + if (!CheckForFlatColumn(schema->Column(i))) { + return Status::Invalid("The requested column is not flat"); + } + std::unique_ptr impl( + new FlatColumnReader::Impl(pool_, schema->Column(i), reader_.get(), i)); + *out = std::unique_ptr(new FlatColumnReader(std::move(impl))); + return Status::OK(); +} + +Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr* out) { + std::unique_ptr flat_column_reader; + RETURN_NOT_OK(GetFlatColumn(i, &flat_column_reader)); + return flat_column_reader->NextBatch(reader_->metadata()->num_rows(), out); +} + +Status FileReader::Impl::ReadFlatTable(std::shared_ptr
* table) { + auto descr = reader_->metadata()->schema(); + + const std::string& name = descr->name(); + std::shared_ptr<::arrow::Schema> schema; + RETURN_NOT_OK(FromParquetSchema(descr, &schema)); + + int num_columns = reader_->metadata()->num_columns(); + + std::vector> columns(num_columns); + for (int i = 0; i < num_columns; i++) { + std::shared_ptr array; + RETURN_NOT_OK(ReadFlatColumn(i, &array)); + columns[i] = std::make_shared(schema->field(i), array); + } + + *table = std::make_shared
(name, schema, columns); + return Status::OK(); +} + +FileReader::FileReader( + MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader) + : impl_(new FileReader::Impl(pool, std::move(reader))) {} + +FileReader::~FileReader() {} + +// Static ctor +Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, + ParquetAllocator* allocator, std::unique_ptr* reader) { + std::unique_ptr source(new ParquetReadSource(allocator)); + RETURN_NOT_OK(source->Open(file)); + + // TODO(wesm): reader properties + std::unique_ptr pq_reader; + PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(std::move(source))); + + // Use the same memory pool as the ParquetAllocator + reader->reset(new FileReader(allocator->pool(), std::move(pq_reader))); + return Status::OK(); +} + +Status FileReader::GetFlatColumn(int i, std::unique_ptr* out) { + return impl_->GetFlatColumn(i, out); +} + +Status FileReader::ReadFlatColumn(int i, std::shared_ptr* out) { + return impl_->ReadFlatColumn(i, out); +} + +Status FileReader::ReadFlatTable(std::shared_ptr
* out) { + return impl_->ReadFlatTable(out); +} + +FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr, + ::parquet::ParquetFileReader* reader, int column_index) + : pool_(pool), + descr_(descr), + reader_(reader), + column_index_(column_index), + next_row_group_(0), + values_buffer_(pool), + def_levels_buffer_(pool) { + NodeToField(descr_->schema_node(), &field_); + NextRowGroup(); +} + +template +Status FlatColumnReader::Impl::ReadNonNullableBatch(typename ParquetType::c_type* values, + int64_t values_read, BuilderType* builder) { + using ArrowCType = typename ArrowType::c_type; + using ParquetCType = typename ParquetType::c_type; + + DCHECK(builder); + const ArrowCType* values_ptr = nullptr; + RETURN_NOT_OK( + (ConvertPhysicalType(values, values_read, &values_ptr))); + RETURN_NOT_OK(builder->Append(values_ptr, values_read)); + return Status::OK(); +} + +template +Status FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels, + typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read, + BuilderType* builder) { + using ArrowCType = typename ArrowType::c_type; + + DCHECK(builder); + RETURN_NOT_OK(values_builder_buffer_.Resize(levels_read * sizeof(ArrowCType))); + RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t))); + auto values_ptr = reinterpret_cast(values_builder_buffer_.mutable_data()); + uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data(); + int values_idx = 0; + for (int64_t i = 0; i < levels_read; i++) { + if (def_levels[i] < descr_->max_definition_level()) { + valid_bytes[i] = 0; + } else { + valid_bytes[i] = 1; + values_ptr[i] = values[values_idx++]; + } + } + RETURN_NOT_OK(builder->Append(values_ptr, levels_read, valid_bytes)); + return Status::OK(); +} + +template +Status FlatColumnReader::Impl::TypedReadBatch( + int batch_size, std::shared_ptr* out) { + using ParquetCType = typename ParquetType::c_type; + + int values_to_read = batch_size; + BuilderType builder(pool_, field_->type); + while ((values_to_read > 0) && column_reader_) { + values_buffer_.Resize(values_to_read * sizeof(ParquetCType)); + if (descr_->max_definition_level() > 0) { + def_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); + } + auto reader = dynamic_cast*>(column_reader_.get()); + int64_t values_read; + int64_t levels_read; + int16_t* def_levels = reinterpret_cast(def_levels_buffer_.mutable_data()); + auto values = reinterpret_cast(values_buffer_.mutable_data()); + PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch( + values_to_read, def_levels, nullptr, values, &values_read)); + values_to_read -= levels_read; + if (descr_->max_definition_level() == 0) { + RETURN_NOT_OK( + (ReadNonNullableBatch(values, values_read, &builder))); + } else { + // As per the defintion and checks for flat columns: + // descr_->max_definition_level() == 1 + RETURN_NOT_OK((ReadNullableFlatBatch( + def_levels, values, values_read, levels_read, &builder))); + } + if (!column_reader_->HasNext()) { NextRowGroup(); } + } + *out = builder.Finish(); + return Status::OK(); +} + +template <> +Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>( + int batch_size, std::shared_ptr* out) { + int values_to_read = batch_size; + ::arrow::StringBuilder builder(pool_, field_->type); + while ((values_to_read > 0) && column_reader_) { + values_buffer_.Resize(values_to_read * sizeof(::parquet::ByteArray)); + if (descr_->max_definition_level() > 0) { + def_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); + } + auto reader = + dynamic_cast*>(column_reader_.get()); + int64_t values_read; + int64_t levels_read; + int16_t* def_levels = reinterpret_cast(def_levels_buffer_.mutable_data()); + auto values = reinterpret_cast<::parquet::ByteArray*>(values_buffer_.mutable_data()); + PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch( + values_to_read, def_levels, nullptr, values, &values_read)); + values_to_read -= levels_read; + if (descr_->max_definition_level() == 0) { + for (int64_t i = 0; i < levels_read; i++) { + RETURN_NOT_OK( + builder.Append(reinterpret_cast(values[i].ptr), values[i].len)); + } + } else { + // descr_->max_definition_level() == 1 + int values_idx = 0; + for (int64_t i = 0; i < levels_read; i++) { + if (def_levels[i] < descr_->max_definition_level()) { + RETURN_NOT_OK(builder.AppendNull()); + } else { + RETURN_NOT_OK( + builder.Append(reinterpret_cast(values[values_idx].ptr), + values[values_idx].len)); + values_idx++; + } + } + } + if (!column_reader_->HasNext()) { NextRowGroup(); } + } + *out = builder.Finish(); + return Status::OK(); +} + +#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \ + case ::arrow::Type::ENUM: \ + return TypedReadBatch(batch_size, out); \ + break; + +Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out) { + if (!column_reader_) { + // Exhausted all row groups. + *out = nullptr; + return Status::OK(); + } + + switch (field_->type->type) { + TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType) + TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type) + TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type) + TYPED_BATCH_CASE(UINT16, ::arrow::UInt16Type, Int32Type) + TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type) + TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type) + TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type) + TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type) + TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type) + TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType) + TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType) + TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType) + TYPED_BATCH_CASE(TIMESTAMP, ::arrow::TimestampType, Int64Type) + default: + return Status::NotImplemented(field_->type->ToString()); + } +} + +void FlatColumnReader::Impl::NextRowGroup() { + if (next_row_group_ < reader_->metadata()->num_row_groups()) { + column_reader_ = reader_->RowGroup(next_row_group_)->Column(column_index_); + next_row_group_++; + } else { + column_reader_ = nullptr; + } +} + +FlatColumnReader::FlatColumnReader(std::unique_ptr impl) : impl_(std::move(impl)) {} + +FlatColumnReader::~FlatColumnReader() {} + +Status FlatColumnReader::NextBatch(int batch_size, std::shared_ptr* out) { + return impl_->NextBatch(batch_size, out); +} + +} // namespace arrow +} // namespace parquet diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h new file mode 100644 index 00000000..997fd7ac --- /dev/null +++ b/src/parquet/arrow/reader.h @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_ARROW_READER_H +#define PARQUET_ARROW_READER_H + +#include + +#include "parquet/api/reader.h" +#include "parquet/api/schema.h" +#include "parquet/arrow/io.h" + +#include "arrow/io/interfaces.h" + +namespace arrow { + +class Array; +class MemoryPool; +class RowBatch; +class Status; +class Table; + +} + +namespace parquet { + +namespace arrow { + +class FlatColumnReader; + +// Arrow read adapter class for deserializing Parquet files as Arrow row +// batches. +// +// TODO(wesm): nested data does not always make sense with this user +// interface unless you are only reading a single leaf node from a branch of +// a table. For example: +// +// repeated group data { +// optional group record { +// optional int32 val1; +// optional byte_array val2; +// optional bool val3; +// } +// optional int32 val4; +// } +// +// In the Parquet file, there are 3 leaf nodes: +// +// * data.record.val1 +// * data.record.val2 +// * data.record.val3 +// * data.val4 +// +// When materializing this data in an Arrow array, we would have: +// +// data: list), +// val3: bool, +// >, +// val4: int32 +// >> +// +// However, in the Parquet format, each leaf node has its own repetition and +// definition levels describing the structure of the intermediate nodes in +// this array structure. Thus, we will need to scan the leaf data for a group +// of leaf nodes part of the same type tree to create a single result Arrow +// nested array structure. +// +// This is additionally complicated "chunky" repeated fields or very large byte +// arrays +class PARQUET_EXPORT FileReader { + public: + FileReader(::arrow::MemoryPool* pool, std::unique_ptr reader); + + // Since the distribution of columns amongst a Parquet file's row groups may + // be uneven (the number of values in each column chunk can be different), we + // provide a column-oriented read interface. The ColumnReader hides the + // details of paging through the file's row groups and yielding + // fully-materialized arrow::Array instances + // + // Returns error status if the column of interest is not flat. + ::arrow::Status GetFlatColumn(int i, std::unique_ptr* out); + // Read column as a whole into an Array. + ::arrow::Status ReadFlatColumn(int i, std::shared_ptr<::arrow::Array>* out); + // Read a table of flat columns into a Table. + ::arrow::Status ReadFlatTable(std::shared_ptr<::arrow::Table>* out); + + virtual ~FileReader(); + + private: + class PARQUET_NO_EXPORT Impl; + std::unique_ptr impl_; +}; + +// At this point, the column reader is a stream iterator. It only knows how to +// read the next batch of values for a particular column from the file until it +// runs out. +// +// We also do not expose any internal Parquet details, such as row groups. This +// might change in the future. +class PARQUET_EXPORT FlatColumnReader { + public: + virtual ~FlatColumnReader(); + + // Scan the next array of the indicated size. The actual size of the + // returned array may be less than the passed size depending how much data is + // available in the file. + // + // When all the data in the file has been exhausted, the result is set to + // nullptr. + // + // Returns Status::OK on a successful read, including if you have exhausted + // the data available in the file. + ::arrow::Status NextBatch(int batch_size, std::shared_ptr<::arrow::Array>* out); + + private: + class PARQUET_NO_EXPORT Impl; + std::unique_ptr impl_; + explicit FlatColumnReader(std::unique_ptr impl); + + friend class FileReader; +}; + +// Helper function to create a file reader from an implementation of an Arrow +// readable file +PARQUET_EXPORT +::arrow::Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, + ParquetAllocator* allocator, std::unique_ptr* reader); + +} // namespace arrow +} // namespace parquet + +#endif // PARQUET_ARROW_READER_H diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc new file mode 100644 index 00000000..5a38a286 --- /dev/null +++ b/src/parquet/arrow/schema.cc @@ -0,0 +1,351 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/arrow/schema.h" + +#include +#include + +#include "parquet/api/schema.h" +#include "parquet/arrow/utils.h" + +#include "arrow/types/decimal.h" +#include "arrow/types/string.h" +#include "arrow/util/status.h" + +using arrow::Field; +using arrow::Status; +using arrow::TypePtr; + +using ArrowType = arrow::Type; + +using parquet::Repetition; +using parquet::schema::Node; +using parquet::schema::NodePtr; +using parquet::schema::GroupNode; +using parquet::schema::PrimitiveNode; + +using ParquetType = parquet::Type; +using parquet::LogicalType; + +namespace parquet { + +namespace arrow { + +const auto BOOL = std::make_shared<::arrow::BooleanType>(); +const auto UINT8 = std::make_shared<::arrow::UInt8Type>(); +const auto INT8 = std::make_shared<::arrow::Int8Type>(); +const auto UINT16 = std::make_shared<::arrow::UInt16Type>(); +const auto INT16 = std::make_shared<::arrow::Int16Type>(); +const auto UINT32 = std::make_shared<::arrow::UInt32Type>(); +const auto INT32 = std::make_shared<::arrow::Int32Type>(); +const auto UINT64 = std::make_shared<::arrow::UInt64Type>(); +const auto INT64 = std::make_shared<::arrow::Int64Type>(); +const auto FLOAT = std::make_shared<::arrow::FloatType>(); +const auto DOUBLE = std::make_shared<::arrow::DoubleType>(); +const auto UTF8 = std::make_shared<::arrow::StringType>(); +const auto TIMESTAMP_MS = + std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI); +const auto BINARY = + std::make_shared<::arrow::ListType>(std::make_shared<::arrow::Field>("", UINT8)); + +TypePtr MakeDecimalType(const PrimitiveNode* node) { + int precision = node->decimal_metadata().precision; + int scale = node->decimal_metadata().scale; + return std::make_shared<::arrow::DecimalType>(precision, scale); +} + +static Status FromByteArray(const PrimitiveNode* node, TypePtr* out) { + switch (node->logical_type()) { + case LogicalType::UTF8: + *out = UTF8; + break; + case LogicalType::DECIMAL: + *out = MakeDecimalType(node); + break; + default: + // BINARY + *out = BINARY; + break; + } + return Status::OK(); +} + +static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) { + switch (node->logical_type()) { + case LogicalType::NONE: + *out = BINARY; + break; + case LogicalType::DECIMAL: + *out = MakeDecimalType(node); + break; + default: + return Status::NotImplemented("unhandled type"); + break; + } + + return Status::OK(); +} + +static Status FromInt32(const PrimitiveNode* node, TypePtr* out) { + switch (node->logical_type()) { + case LogicalType::NONE: + *out = INT32; + break; + case LogicalType::UINT_8: + *out = UINT8; + break; + case LogicalType::INT_8: + *out = INT8; + break; + case LogicalType::UINT_16: + *out = UINT16; + break; + case LogicalType::INT_16: + *out = INT16; + break; + case LogicalType::UINT_32: + *out = UINT32; + break; + case LogicalType::DECIMAL: + *out = MakeDecimalType(node); + break; + default: + return Status::NotImplemented("Unhandled logical type for int32"); + break; + } + return Status::OK(); +} + +static Status FromInt64(const PrimitiveNode* node, TypePtr* out) { + switch (node->logical_type()) { + case LogicalType::NONE: + *out = INT64; + break; + case LogicalType::UINT_64: + *out = UINT64; + break; + case LogicalType::DECIMAL: + *out = MakeDecimalType(node); + break; + case LogicalType::TIMESTAMP_MILLIS: + *out = TIMESTAMP_MS; + break; + default: + return Status::NotImplemented("Unhandled logical type for int64"); + break; + } + return Status::OK(); +} + +// TODO: Logical Type Handling +Status NodeToField(const NodePtr& node, std::shared_ptr* out) { + std::shared_ptr<::arrow::DataType> type; + + if (node->is_repeated()) { + return Status::NotImplemented("No support yet for repeated node types"); + } + + if (node->is_group()) { + const GroupNode* group = static_cast(node.get()); + std::vector> fields(group->field_count()); + for (int i = 0; i < group->field_count(); i++) { + RETURN_NOT_OK(NodeToField(group->field(i), &fields[i])); + } + type = std::make_shared<::arrow::StructType>(fields); + } else { + // Primitive (leaf) node + const PrimitiveNode* primitive = static_cast(node.get()); + + switch (primitive->physical_type()) { + case ParquetType::BOOLEAN: + type = BOOL; + break; + case ParquetType::INT32: + RETURN_NOT_OK(FromInt32(primitive, &type)); + break; + case ParquetType::INT64: + RETURN_NOT_OK(FromInt64(primitive, &type)); + break; + case ParquetType::INT96: + // TODO: Do we have that type in Arrow? + // type = TypePtr(new Int96Type()); + return Status::NotImplemented("int96"); + case ParquetType::FLOAT: + type = FLOAT; + break; + case ParquetType::DOUBLE: + type = DOUBLE; + break; + case ParquetType::BYTE_ARRAY: + // TODO: Do we have that type in Arrow? + RETURN_NOT_OK(FromByteArray(primitive, &type)); + break; + case ParquetType::FIXED_LEN_BYTE_ARRAY: + RETURN_NOT_OK(FromFLBA(primitive, &type)); + break; + } + } + + *out = std::make_shared(node->name(), type, !node->is_required()); + return Status::OK(); +} + +Status FromParquetSchema( + const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out) { + // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes + // from the root Parquet node + const GroupNode* schema_node = parquet_schema->group_node(); + + std::vector> fields(schema_node->field_count()); + for (int i = 0; i < schema_node->field_count(); i++) { + RETURN_NOT_OK(NodeToField(schema_node->field(i), &fields[i])); + } + + *out = std::make_shared<::arrow::Schema>(fields); + return Status::OK(); +} + +Status StructToNode(const std::shared_ptr<::arrow::StructType>& type, + const std::string& name, bool nullable, const WriterProperties& properties, + NodePtr* out) { + Repetition::type repetition = Repetition::REQUIRED; + if (nullable) { repetition = Repetition::OPTIONAL; } + + std::vector children(type->num_children()); + for (int i = 0; i < type->num_children(); i++) { + RETURN_NOT_OK(FieldToNode(type->child(i), properties, &children[i])); + } + + *out = GroupNode::Make(name, repetition, children); + return Status::OK(); +} + +Status FieldToNode(const std::shared_ptr& field, + const WriterProperties& properties, NodePtr* out) { + LogicalType::type logical_type = LogicalType::NONE; + ParquetType::type type; + Repetition::type repetition = Repetition::REQUIRED; + if (field->nullable) { repetition = Repetition::OPTIONAL; } + int length = -1; + + switch (field->type->type) { + // TODO: + // case ArrowType::NA: + // break; + case ArrowType::BOOL: + type = ParquetType::BOOLEAN; + break; + case ArrowType::UINT8: + type = ParquetType::INT32; + logical_type = LogicalType::UINT_8; + break; + case ArrowType::INT8: + type = ParquetType::INT32; + logical_type = LogicalType::INT_8; + break; + case ArrowType::UINT16: + type = ParquetType::INT32; + logical_type = LogicalType::UINT_16; + break; + case ArrowType::INT16: + type = ParquetType::INT32; + logical_type = LogicalType::INT_16; + break; + case ArrowType::UINT32: + if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) { + type = ParquetType::INT64; + } else { + type = ParquetType::INT32; + logical_type = LogicalType::UINT_32; + } + break; + case ArrowType::INT32: + type = ParquetType::INT32; + break; + case ArrowType::UINT64: + type = ParquetType::INT64; + logical_type = LogicalType::UINT_64; + break; + case ArrowType::INT64: + type = ParquetType::INT64; + break; + case ArrowType::FLOAT: + type = ParquetType::FLOAT; + break; + case ArrowType::DOUBLE: + type = ParquetType::DOUBLE; + break; + case ArrowType::STRING: + type = ParquetType::BYTE_ARRAY; + logical_type = LogicalType::UTF8; + break; + case ArrowType::BINARY: + type = ParquetType::BYTE_ARRAY; + break; + case ArrowType::DATE: + type = ParquetType::INT32; + logical_type = LogicalType::DATE; + break; + case ArrowType::TIMESTAMP: { + auto timestamp_type = static_cast<::arrow::TimestampType*>(field->type.get()); + if (timestamp_type->unit != ::arrow::TimestampType::Unit::MILLI) { + return Status::NotImplemented( + "Other timestamp units than millisecond are not yet support with parquet."); + } + type = ParquetType::INT64; + logical_type = LogicalType::TIMESTAMP_MILLIS; + } break; + case ArrowType::TIMESTAMP_DOUBLE: + type = ParquetType::INT64; + // This is specified as seconds since the UNIX epoch + // TODO: Converted type in Parquet? + // logical_type = LogicalType::TIMESTAMP_MILLIS; + break; + case ArrowType::TIME: + type = ParquetType::INT64; + logical_type = LogicalType::TIME_MILLIS; + break; + case ArrowType::STRUCT: { + auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type); + return StructToNode(struct_type, field->name, field->nullable, properties, out); + } break; + default: + // TODO: LIST, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR + return Status::NotImplemented("unhandled type"); + } + *out = PrimitiveNode::Make(field->name, repetition, type, logical_type, length); + return Status::OK(); +} + +Status ToParquetSchema(const ::arrow::Schema* arrow_schema, + const WriterProperties& properties, std::shared_ptr* out) { + std::vector nodes(arrow_schema->num_fields()); + for (int i = 0; i < arrow_schema->num_fields(); i++) { + RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), properties, &nodes[i])); + } + + NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes); + *out = std::make_shared<::parquet::SchemaDescriptor>(); + PARQUET_CATCH_NOT_OK((*out)->Init(schema)); + + return Status::OK(); +} + +} // namespace arrow + +} // namespace parquet diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h new file mode 100644 index 00000000..0baed3cc --- /dev/null +++ b/src/parquet/arrow/schema.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_ARROW_SCHEMA_H +#define PARQUET_ARROW_SCHEMA_H + +#include + +#include "parquet/api/schema.h" +#include "parquet/api/writer.h" + +#include +#include +#include + +namespace arrow { + +class Status; + +} // namespace arrow + +namespace parquet { + +namespace arrow { + +::arrow::Status PARQUET_EXPORT NodeToField( + const schema::NodePtr& node, std::shared_ptr<::arrow::Field>* out); + +::arrow::Status PARQUET_EXPORT FromParquetSchema( + const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out); + +::arrow::Status PARQUET_EXPORT FieldToNode(const std::shared_ptr<::arrow::Field>& field, + const WriterProperties& properties, schema::NodePtr* out); + +::arrow::Status PARQUET_EXPORT ToParquetSchema(const ::arrow::Schema* arrow_schema, + const WriterProperties& properties, std::shared_ptr* out); + +} // namespace arrow + +} // namespace parquet + +#endif // PARQUET_ARROW_SCHEMA_H diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h new file mode 100644 index 00000000..d1358373 --- /dev/null +++ b/src/parquet/arrow/test-util.h @@ -0,0 +1,193 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "arrow/test-util.h" +#include "arrow/types/primitive.h" +#include "arrow/types/string.h" + +namespace parquet { + +namespace arrow { + +template +using is_arrow_float = std::is_floating_point; + +template +using is_arrow_int = std::is_integral; + +template +using is_arrow_string = std::is_same; + +template +typename std::enable_if::value, + std::shared_ptr<::arrow::PrimitiveArray>>::type +NonNullArray(size_t size) { + std::vector values; + ::arrow::test::random_real(size, 0, 0, 1, &values); + ::arrow::NumericBuilder builder(::arrow::default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size()); + return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish()); +} + +template +typename std::enable_if::value, + std::shared_ptr<::arrow::PrimitiveArray>>::type +NonNullArray(size_t size) { + std::vector values; + ::arrow::test::randint(size, 0, 64, &values); + ::arrow::NumericBuilder builder(::arrow::default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size()); + return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish()); +} + +template +typename std::enable_if::value, + std::shared_ptr<::arrow::StringArray>>::type +NonNullArray(size_t size) { + ::arrow::StringBuilder builder(::arrow::default_memory_pool(), std::make_shared<::arrow::StringType>()); + for (size_t i = 0; i < size; i++) { + builder.Append("test-string"); + } + return std::static_pointer_cast<::arrow::StringArray>(builder.Finish()); +} + +template <> +std::shared_ptr<::arrow::PrimitiveArray> NonNullArray<::arrow::BooleanType>(size_t size) { + std::vector values; + ::arrow::test::randint(size, 0, 1, &values); + ::arrow::BooleanBuilder builder(::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>()); + builder.Append(values.data(), values.size()); + return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish()); +} + +// This helper function only supports (size/2) nulls. +template +typename std::enable_if::value, + std::shared_ptr<::arrow::PrimitiveArray>>::type +NullableArray(size_t size, size_t num_nulls) { + std::vector values; + ::arrow::test::random_real(size, 0, 0, 1, &values); + std::vector valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + ::arrow::NumericBuilder builder(::arrow::default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size(), valid_bytes.data()); + return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish()); +} + +// This helper function only supports (size/2) nulls. +template +typename std::enable_if::value, + std::shared_ptr<::arrow::PrimitiveArray>>::type +NullableArray(size_t size, size_t num_nulls) { + std::vector values; + ::arrow::test::randint(size, 0, 64, &values); + std::vector valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + ::arrow::NumericBuilder builder(::arrow::default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size(), valid_bytes.data()); + return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish()); +} + +// This helper function only supports (size/2) nulls yet. +template +typename std::enable_if::value, + std::shared_ptr<::arrow::StringArray>>::type +NullableArray(size_t size, size_t num_nulls) { + std::vector valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + ::arrow::StringBuilder builder(::arrow::default_memory_pool(), std::make_shared<::arrow::StringType>()); + for (size_t i = 0; i < size; i++) { + builder.Append("test-string"); + } + return std::static_pointer_cast<::arrow::StringArray>(builder.Finish()); +} + +// This helper function only supports (size/2) nulls yet. +template <> +std::shared_ptr<::arrow::PrimitiveArray> NullableArray<::arrow::BooleanType>( + size_t size, size_t num_nulls) { + std::vector values; + ::arrow::test::randint(size, 0, 1, &values); + std::vector valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + ::arrow::BooleanBuilder builder(::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>()); + builder.Append(values.data(), values.size(), valid_bytes.data()); + return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish()); +} + +std::shared_ptr<::arrow::Column> MakeColumn( + const std::string& name, const std::shared_ptr<::arrow::Array>& array, bool nullable) { + auto field = std::make_shared<::arrow::Field>(name, array->type(), nullable); + return std::make_shared<::arrow::Column>(field, array); +} + +std::shared_ptr<::arrow::Table> MakeSimpleTable( + const std::shared_ptr<::arrow::Array>& values, bool nullable) { + std::shared_ptr<::arrow::Column> column = MakeColumn("col", values, nullable); + std::vector> columns({column}); + std::vector> fields({column->field()}); + auto schema = std::make_shared<::arrow::Schema>(fields); + return std::make_shared<::arrow::Table>("table", schema, columns); +} + +template +void ExpectArray(T* expected, ::arrow::Array* result) { + auto p_array = static_cast<::arrow::PrimitiveArray*>(result); + for (int i = 0; i < result->length(); i++) { + EXPECT_EQ(expected[i], reinterpret_cast(p_array->data()->data())[i]); + } +} + +template +void ExpectArray(typename ArrowType::c_type* expected, ::arrow::Array* result) { + ::arrow::PrimitiveArray* p_array = static_cast<::arrow::PrimitiveArray*>(result); + for (int64_t i = 0; i < result->length(); i++) { + EXPECT_EQ(expected[i], + reinterpret_cast(p_array->data()->data())[i]); + } +} + +template <> +void ExpectArray<::arrow::BooleanType>(uint8_t* expected, ::arrow::Array* result) { + ::arrow::BooleanBuilder builder(::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>()); + builder.Append(expected, result->length()); + std::shared_ptr<::arrow::Array> expected_array = builder.Finish(); + EXPECT_TRUE(result->Equals(expected_array)); +} + +} // namespace arrow + +} // namespace parquet diff --git a/src/parquet/arrow/utils.h b/src/parquet/arrow/utils.h new file mode 100644 index 00000000..8a085efa --- /dev/null +++ b/src/parquet/arrow/utils.h @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_ARROW_UTILS_H +#define PARQUET_ARROW_UTILS_H + +#include + +#include "arrow/util/status.h" +#include "parquet/exception.h" + +namespace parquet { +namespace arrow { + +#define PARQUET_CATCH_NOT_OK(s) \ + try { \ + (s); \ + } catch (const ::parquet::ParquetException& e) { return ::arrow::Status::Invalid(e.what()); } + +#define PARQUET_IGNORE_NOT_OK(s) \ + try { \ + (s); \ + } catch (const ::parquet::ParquetException& e) {} + +#define PARQUET_THROW_NOT_OK(s) \ + do { \ + ::arrow::Status _s = (s); \ + if (!_s.ok()) { \ + std::stringstream ss; \ + ss << "Arrow error: " << _s.ToString(); \ + throw ::parquet::ParquetException(ss.str()); \ + } \ + } while (0); + +} // namespace arrow +} // namespace parquet + +#endif // PARQUET_ARROW_UTILS_H diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc new file mode 100644 index 00000000..5d79ea92 --- /dev/null +++ b/src/parquet/arrow/writer.cc @@ -0,0 +1,373 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/arrow/writer.h" + +#include +#include + +#include "parquet/arrow/schema.h" +#include "parquet/arrow/utils.h" + +#include "arrow/array.h" +#include "arrow/column.h" +#include "arrow/table.h" +#include "arrow/types/construct.h" +#include "arrow/types/primitive.h" +#include "arrow/types/string.h" +#include "arrow/util/status.h" + +using arrow::MemoryPool; +using arrow::PoolBuffer; +using arrow::PrimitiveArray; +using arrow::Status; +using arrow::StringArray; +using arrow::Table; + +using parquet::ParquetFileWriter; +using parquet::ParquetVersion; +using parquet::schema::GroupNode; + +namespace parquet { +namespace arrow { + +class FileWriter::Impl { + public: + Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer); + + Status NewRowGroup(int64_t chunk_size); + template + Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data, + int64_t offset, int64_t length); + + // TODO(uwe): Same code as in reader.cc the only difference is the name of the temporary + // buffer + template + struct can_copy_ptr { + static constexpr bool value = + std::is_same::value || + (std::is_integral{} && std::is_integral{} && + (sizeof(InType) == sizeof(OutType))); + }; + + template ::value>::type* = nullptr> + Status ConvertPhysicalType(const InType* in_ptr, int64_t, const OutType** out_ptr) { + *out_ptr = reinterpret_cast(in_ptr); + return Status::OK(); + } + + template ::value>::type* = nullptr> + Status ConvertPhysicalType( + const InType* in_ptr, int64_t length, const OutType** out_ptr) { + RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(OutType))); + OutType* mutable_out_ptr = reinterpret_cast(data_buffer_.mutable_data()); + std::copy(in_ptr, in_ptr + length, mutable_out_ptr); + *out_ptr = mutable_out_ptr; + return Status::OK(); + } + + Status WriteFlatColumnChunk(const PrimitiveArray* data, int64_t offset, int64_t length); + Status WriteFlatColumnChunk(const StringArray* data, int64_t offset, int64_t length); + Status Close(); + + virtual ~Impl() {} + + private: + friend class FileWriter; + + MemoryPool* pool_; + // Buffer used for storing the data of an array converted to the physical type + // as expected by parquet-cpp. + PoolBuffer data_buffer_; + PoolBuffer def_levels_buffer_; + std::unique_ptr<::parquet::ParquetFileWriter> writer_; + ::parquet::RowGroupWriter* row_group_writer_; +}; + +FileWriter::Impl::Impl( + MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer) + : pool_(pool), + data_buffer_(pool), + writer_(std::move(writer)), + row_group_writer_(nullptr) {} + +Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) { + if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); } + PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup(chunk_size)); + return Status::OK(); +} + +template +Status FileWriter::Impl::TypedWriteBatch(::parquet::ColumnWriter* column_writer, + const PrimitiveArray* data, int64_t offset, int64_t length) { + using ArrowCType = typename ArrowType::c_type; + using ParquetCType = typename ParquetType::c_type; + + DCHECK((offset + length) <= data->length()); + auto data_ptr = reinterpret_cast(data->data()->data()) + offset; + auto writer = + reinterpret_cast<::parquet::TypedColumnWriter*>(column_writer); + if (writer->descr()->max_definition_level() == 0) { + // no nulls, just dump the data + const ParquetCType* data_writer_ptr = nullptr; + RETURN_NOT_OK((ConvertPhysicalType( + data_ptr, length, &data_writer_ptr))); + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, data_writer_ptr)); + } else if (writer->descr()->max_definition_level() == 1) { + RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t))); + int16_t* def_levels_ptr = + reinterpret_cast(def_levels_buffer_.mutable_data()); + if (data->null_count() == 0) { + std::fill(def_levels_ptr, def_levels_ptr + length, 1); + const ParquetCType* data_writer_ptr = nullptr; + RETURN_NOT_OK((ConvertPhysicalType( + data_ptr, length, &data_writer_ptr))); + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(length, def_levels_ptr, nullptr, data_writer_ptr)); + } else { + RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ParquetCType))); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); + int buffer_idx = 0; + for (int i = 0; i < length; i++) { + if (data->IsNull(offset + i)) { + def_levels_ptr[i] = 0; + } else { + def_levels_ptr[i] = 1; + buffer_ptr[buffer_idx++] = static_cast(data_ptr[i]); + } + } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); + } + } else { + return Status::NotImplemented("no support for max definition level > 1 yet"); + } + PARQUET_CATCH_NOT_OK(writer->Close()); + return Status::OK(); +} + +// This specialization seems quite similar but it significantly differs in two points: +// * offset is added at the most latest time to the pointer as we have sub-byte access +// * Arrow data is stored bitwise thus we cannot use std::copy to transform from +// ArrowType::c_type to ParquetType::c_type +template <> +Status FileWriter::Impl::TypedWriteBatch<::parquet::BooleanType, BooleanType>( + ::parquet::ColumnWriter* column_writer, const PrimitiveArray* data, int64_t offset, + int64_t length) { + DCHECK((offset + length) <= data->length()); + RETURN_NOT_OK(data_buffer_.Resize(length)); + auto data_ptr = reinterpret_cast(data->data()->data()); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); + auto writer = reinterpret_cast<::parquet::TypedColumnWriter<::parquet::BooleanType>*>( + column_writer); + if (writer->descr()->max_definition_level() == 0) { + // no nulls, just dump the data + for (int64_t i = 0; i < length; i++) { + buffer_ptr[i] = ::arrow::util::get_bit(data_ptr, offset + i); + } + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, buffer_ptr)); + } else if (writer->descr()->max_definition_level() == 1) { + RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t))); + int16_t* def_levels_ptr = + reinterpret_cast(def_levels_buffer_.mutable_data()); + if (data->null_count() == 0) { + std::fill(def_levels_ptr, def_levels_ptr + length, 1); + for (int64_t i = 0; i < length; i++) { + buffer_ptr[i] = ::arrow::util::get_bit(data_ptr, offset + i); + } + // TODO(PARQUET-644): write boolean values as a packed bitmap + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); + } else { + int buffer_idx = 0; + for (int i = 0; i < length; i++) { + if (data->IsNull(offset + i)) { + def_levels_ptr[i] = 0; + } else { + def_levels_ptr[i] = 1; + buffer_ptr[buffer_idx++] = ::arrow::util::get_bit(data_ptr, offset + i); + } + } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); + } + } else { + return Status::NotImplemented("no support for max definition level > 1 yet"); + } + PARQUET_CATCH_NOT_OK(writer->Close()); + return Status::OK(); +} + +Status FileWriter::Impl::Close() { + if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); } + PARQUET_CATCH_NOT_OK(writer_->Close()); + return Status::OK(); +} + +#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \ + case ::arrow::Type::ENUM: \ + return TypedWriteBatch(writer, data, offset, length); \ + break; + +Status FileWriter::Impl::WriteFlatColumnChunk( + const PrimitiveArray* data, int64_t offset, int64_t length) { + ::parquet::ColumnWriter* writer; + PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn()); + switch (data->type_enum()) { + TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType) + TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type) + TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type) + TYPED_BATCH_CASE(UINT16, ::arrow::UInt16Type, Int32Type) + TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type) + case ::arrow::Type::UINT32: + if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) { + // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need + // to use the larger Int64Type to store them lossless. + return TypedWriteBatch( + writer, data, offset, length); + } else { + return TypedWriteBatch( + writer, data, offset, length); + } + TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type) + TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type) + TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type) + TYPED_BATCH_CASE(TIMESTAMP, ::arrow::TimestampType, Int64Type) + TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType) + TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType) + default: + return Status::NotImplemented(data->type()->ToString()); + } +} + +Status FileWriter::Impl::WriteFlatColumnChunk( + const StringArray* data, int64_t offset, int64_t length) { + ::parquet::ColumnWriter* column_writer; + PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn()); + DCHECK((offset + length) <= data->length()); + RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(::parquet::ByteArray))); + auto buffer_ptr = reinterpret_cast<::parquet::ByteArray*>(data_buffer_.mutable_data()); + auto values = std::dynamic_pointer_cast(data->values()); + auto data_ptr = reinterpret_cast(values->data()->data()); + DCHECK(values != nullptr); + auto writer = reinterpret_cast<::parquet::TypedColumnWriter<::parquet::ByteArrayType>*>( + column_writer); + if (writer->descr()->max_definition_level() > 0) { + RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t))); + } + int16_t* def_levels_ptr = reinterpret_cast(def_levels_buffer_.mutable_data()); + if (writer->descr()->max_definition_level() == 0 || data->null_count() == 0) { + // no nulls, just dump the data + for (int64_t i = 0; i < length; i++) { + buffer_ptr[i] = ::parquet::ByteArray( + data->value_length(i + offset), data_ptr + data->value_offset(i)); + } + if (writer->descr()->max_definition_level() > 0) { + std::fill(def_levels_ptr, def_levels_ptr + length, 1); + } + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); + } else if (writer->descr()->max_definition_level() == 1) { + int buffer_idx = 0; + for (int64_t i = 0; i < length; i++) { + if (data->IsNull(offset + i)) { + def_levels_ptr[i] = 0; + } else { + def_levels_ptr[i] = 1; + buffer_ptr[buffer_idx++] = ::parquet::ByteArray( + data->value_length(i + offset), data_ptr + data->value_offset(i + offset)); + } + } + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); + } else { + return Status::NotImplemented("no support for max definition level > 1 yet"); + } + PARQUET_CATCH_NOT_OK(writer->Close()); + return Status::OK(); +} + +FileWriter::FileWriter( + MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer) + : impl_(new FileWriter::Impl(pool, std::move(writer))) {} + +Status FileWriter::NewRowGroup(int64_t chunk_size) { + return impl_->NewRowGroup(chunk_size); +} + +Status FileWriter::WriteFlatColumnChunk( + const ::arrow::Array* array, int64_t offset, int64_t length) { + int64_t real_length = length; + if (length == -1) { real_length = array->length(); } + if (array->type_enum() == ::arrow::Type::STRING) { + auto string_array = dynamic_cast(array); + DCHECK(string_array); + return impl_->WriteFlatColumnChunk(string_array, offset, real_length); + } else { + auto primitive_array = dynamic_cast(array); + if (!primitive_array) { + return Status::NotImplemented("Table must consist of PrimitiveArray instances"); + } + return impl_->WriteFlatColumnChunk(primitive_array, offset, real_length); + } +} + +Status FileWriter::Close() { + return impl_->Close(); +} + +MemoryPool* FileWriter::memory_pool() const { + return impl_->pool_; +} + +FileWriter::~FileWriter() {} + +Status WriteFlatTable(const Table* table, MemoryPool* pool, + const std::shared_ptr<::parquet::OutputStream>& sink, int64_t chunk_size, + const std::shared_ptr<::parquet::WriterProperties>& properties) { + std::shared_ptr<::parquet::SchemaDescriptor> parquet_schema; + RETURN_NOT_OK( + ToParquetSchema(table->schema().get(), *properties.get(), &parquet_schema)); + auto schema_node = std::static_pointer_cast(parquet_schema->schema_root()); + std::unique_ptr parquet_writer = + ParquetFileWriter::Open(sink, schema_node, properties); + FileWriter writer(pool, std::move(parquet_writer)); + + // TODO(ARROW-232) Support writing chunked arrays. + for (int i = 0; i < table->num_columns(); i++) { + if (table->column(i)->data()->num_chunks() != 1) { + return Status::NotImplemented("No support for writing chunked arrays yet."); + } + } + + for (int chunk = 0; chunk * chunk_size < table->num_rows(); chunk++) { + int64_t offset = chunk * chunk_size; + int64_t size = std::min(chunk_size, table->num_rows() - offset); + RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close())); + for (int i = 0; i < table->num_columns(); i++) { + std::shared_ptr<::arrow::Array> array = table->column(i)->data()->chunk(0); + RETURN_NOT_OK_ELSE(writer.WriteFlatColumnChunk(array.get(), offset, size), + PARQUET_IGNORE_NOT_OK(writer.Close())); + } + } + + return writer.Close(); +} + +} // namespace arrow + +} // namespace parquet diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h new file mode 100644 index 00000000..2008227c --- /dev/null +++ b/src/parquet/arrow/writer.h @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_ARROW_WRITER_H +#define PARQUET_ARROW_WRITER_H + +#include + +#include "parquet/api/schema.h" +#include "parquet/api/writer.h" + +namespace arrow { + +class Array; +class MemoryPool; +class PrimitiveArray; +class RowBatch; +class Status; +class StringArray; +class Table; + +} + +namespace parquet { + +namespace arrow { + +/** + * Iterative API: + * Start a new RowGroup/Chunk with NewRowGroup + * Write column-by-column the whole column chunk + */ +class PARQUET_EXPORT FileWriter { + public: + FileWriter(::arrow::MemoryPool* pool, std::unique_ptr writer); + + ::arrow::Status NewRowGroup(int64_t chunk_size); + ::arrow::Status WriteFlatColumnChunk(const ::arrow::Array* data, int64_t offset = 0, int64_t length = -1); + ::arrow::Status Close(); + + virtual ~FileWriter(); + + ::arrow::MemoryPool* memory_pool() const; + + private: + class PARQUET_NO_EXPORT Impl; + std::unique_ptr impl_; +}; + +/** + * Write a flat Table to Parquet. + * + * The table shall only consist of nullable, non-repeated columns of primitive type. + */ +::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table, ::arrow::MemoryPool* pool, + const std::shared_ptr& sink, int64_t chunk_size, + const std::shared_ptr& properties = + default_writer_properties()); + +} // namespace arrow + +} // namespace parquet + +#endif // PARQUET_ARROW_WRITER_H diff --git a/thirdparty/build_thirdparty.sh b/thirdparty/build_thirdparty.sh index dca586c6..82294848 100755 --- a/thirdparty/build_thirdparty.sh +++ b/thirdparty/build_thirdparty.sh @@ -15,6 +15,7 @@ else # Allow passing specific libs to build on the command line for arg in "$*"; do case $arg in + "arrow") F_ARROW=1 ;; "zlib") F_ZLIB=1 ;; "gbenchmark") F_GBENCHMARK=1 ;; "gtest") F_GTEST=1 ;; @@ -57,6 +58,15 @@ fi STANDARD_DARWIN_FLAGS="-std=c++11 -stdlib=libc++" +# build arrow +if [ -n "$F_ALL" -o -n "$F_ARROW" ]; then + cd $TP_DIR/$ARROW_BASEDIR/cpp + source ./setup_build_env.sh + cmake . -DARROW_PARQUET=OFF -DARROW_HDFS=ON -DCMAKE_INSTALL_PREFIX=$PREFIX + make -j$PARALLEL install + # : +fi + # build googletest GOOGLETEST_ERROR="failed for googletest!" if [ -n "$F_ALL" -o -n "$F_GTEST" ]; then diff --git a/thirdparty/download_thirdparty.sh b/thirdparty/download_thirdparty.sh index fae6c9cc..23bdb968 100755 --- a/thirdparty/download_thirdparty.sh +++ b/thirdparty/download_thirdparty.sh @@ -20,6 +20,11 @@ download_extract_and_cleanup() { rm $filename } +if [ ! -d ${ARROW_BASEDIR} ]; then + echo "Fetching arrow" + download_extract_and_cleanup $ARROW_URL +fi + if [ ! -d ${SNAPPY_BASEDIR} ]; then echo "Fetching snappy" download_extract_and_cleanup $SNAPPY_URL diff --git a/thirdparty/set_thirdparty_env.sh b/thirdparty/set_thirdparty_env.sh index 80715ef7..3733d088 100644 --- a/thirdparty/set_thirdparty_env.sh +++ b/thirdparty/set_thirdparty_env.sh @@ -7,6 +7,7 @@ if [ -z "$THIRDPARTY_DIR" ]; then THIRDPARTY_DIR=$SOURCE_DIR fi +export ARROW_HOME=$THIRDPARTY_DIR/installed export SNAPPY_HOME=$THIRDPARTY_DIR/installed export ZLIB_HOME=$THIRDPARTY_DIR/installed # build script doesn't support building thrift on OSX diff --git a/thirdparty/versions.sh b/thirdparty/versions.sh index b56262a8..137bdc2f 100755 --- a/thirdparty/versions.sh +++ b/thirdparty/versions.sh @@ -1,3 +1,7 @@ +ARROW_VERSION="52089d609dff3d8d2abe99c7b94f7af9fe4735bd" +ARROW_URL="https://github.com/apache/arrow/archive/${ARROW_VERSION}.tar.gz" +ARROW_BASEDIR="arrow-${ARROW_VERSION}" + SNAPPY_VERSION=1.1.3 SNAPPY_URL="https://github.com/google/snappy/releases/download/${SNAPPY_VERSION}/snappy-${SNAPPY_VERSION}.tar.gz" SNAPPY_BASEDIR=snappy-$SNAPPY_VERSION From 874b33dedeb3bf3106860bd52dfe6e2561003e6b Mon Sep 17 00:00:00 2001 From: "Korn, Uwe" Date: Tue, 13 Sep 2016 09:08:19 +0200 Subject: [PATCH 02/10] Add boost libraries for Arrow --- .travis.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index 780d9f9b..1aeba6cb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,6 +9,8 @@ addons: - g++-4.9 - valgrind - libboost-dev + - libboost-filesystem-dev + - libboost-system-dev - libboost-program-options-dev - libboost-test-dev - libssl-dev From e0e1518fb02f33cdd97d28f06197cb262c3f1cb1 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Tue, 13 Sep 2016 09:36:10 +0200 Subject: [PATCH 03/10] Build parquet_arrow in Travis --- .travis.yml | 4 ++-- conda.recipe/build.sh | 1 + thirdparty/versions.sh | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 1aeba6cb..6dc994e7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,7 +28,7 @@ matrix: before_script: - source $TRAVIS_BUILD_DIR/ci/before_script_travis.sh - cmake -DCMAKE_CXX_FLAGS="-Werror" -DPARQUET_TEST_MEMCHECK=ON -DPARQUET_BUILD_BENCHMARKS=ON - -DPARQUET_GENERATE_COVERAGE=1 $TRAVIS_BUILD_DIR + -DPARQUET_ARROW=ON -DPARQUET_GENERATE_COVERAGE=1 $TRAVIS_BUILD_DIR - export PARQUET_TEST_DATA=$TRAVIS_BUILD_DIR/data - compiler: clang os: linux @@ -78,7 +78,7 @@ before_install: before_script: - source $TRAVIS_BUILD_DIR/ci/before_script_travis.sh -- cmake -DCMAKE_CXX_FLAGS="-Werror" $TRAVIS_BUILD_DIR +- cmake -DCMAKE_CXX_FLAGS="-Werror" -DPARQUET_ARROW=ON $TRAVIS_BUILD_DIR - export PARQUET_TEST_DATA=$TRAVIS_BUILD_DIR/data script: diff --git a/conda.recipe/build.sh b/conda.recipe/build.sh index 7fe8f91b..f77475c9 100644 --- a/conda.recipe/build.sh +++ b/conda.recipe/build.sh @@ -53,6 +53,7 @@ cmake \ -DCMAKE_BUILD_TYPE=debug \ -DCMAKE_INSTALL_PREFIX=$PREFIX \ -DPARQUET_BUILD_BENCHMARKS=off \ + -DPARQUET_ARROW=ON \ .. make diff --git a/thirdparty/versions.sh b/thirdparty/versions.sh index 137bdc2f..05f5cc23 100755 --- a/thirdparty/versions.sh +++ b/thirdparty/versions.sh @@ -1,4 +1,4 @@ -ARROW_VERSION="52089d609dff3d8d2abe99c7b94f7af9fe4735bd" +ARROW_VERSION="6b8abb4402ff1f39fc5944a7df6e3b4755691d87" ARROW_URL="https://github.com/apache/arrow/archive/${ARROW_VERSION}.tar.gz" ARROW_BASEDIR="arrow-${ARROW_VERSION}" From 251262a6466a599a6dea92879b1652450d05d97e Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Tue, 13 Sep 2016 09:37:45 +0200 Subject: [PATCH 04/10] Style fixes --- src/parquet/arrow/arrow-reader-writer-test.cc | 32 +++++++++++-------- src/parquet/arrow/arrow-schema-test.cc | 9 +++--- src/parquet/arrow/io.h | 2 +- src/parquet/arrow/reader.h | 1 - src/parquet/arrow/schema.h | 8 ++--- src/parquet/arrow/test-util.h | 31 +++++++++++------- src/parquet/arrow/utils.h | 10 +++--- src/parquet/arrow/writer.cc | 2 +- src/parquet/arrow/writer.h | 12 +++---- 9 files changed, 61 insertions(+), 46 deletions(-) diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 8faab43c..f993f547 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -265,8 +265,9 @@ class TestParquetIO : public ::testing::Test { // There we write an UInt32 Array but receive an Int64 Array as result for // Parquet version 1.0. -typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, ::arrow::Int16Type, - ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType, +typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, + ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, + ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType> TestTypes; TYPED_TEST_CASE(TestParquetIO, TestTypes); @@ -284,8 +285,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { auto values = NonNullArray(SMALL_SIZE); std::shared_ptr
table = MakeSimpleTable(values, false); this->sink_ = std::make_shared(); - ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(), this->sink_, - values->length(), default_writer_properties())); + ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(), + this->sink_, values->length(), default_writer_properties())); std::shared_ptr
out; this->ReadTableFromFile(this->ReaderFromSink(), &out); @@ -312,8 +313,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { std::shared_ptr values = NullableArray(SMALL_SIZE, 10); std::shared_ptr
table = MakeSimpleTable(values, true); this->sink_ = std::make_shared(); - ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(), this->sink_, - values->length(), default_writer_properties())); + ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(), + this->sink_, values->length(), default_writer_properties())); this->ReadAndCheckSingleColumnTable(values); } @@ -365,8 +366,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { auto values = NullableArray(LARGE_SIZE, 100); std::shared_ptr
table = MakeSimpleTable(values, true); this->sink_ = std::make_shared(); - ASSERT_OK_NO_THROW(WriteFlatTable( - table.get(), ::arrow::default_memory_pool(), this->sink_, 512, default_writer_properties())); + ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(), + this->sink_, 512, default_writer_properties())); this->ReadAndCheckSingleColumnTable(values); } @@ -375,7 +376,8 @@ using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>; TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) { // This also tests max_definition_level = 1 - std::shared_ptr values = NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100); + std::shared_ptr values = + NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100); std::shared_ptr
table = MakeSimpleTable(values, true); // Parquet 2.0 roundtrip should yield an uint32_t column again @@ -391,7 +393,8 @@ TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) { TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) { // This also tests max_definition_level = 1 - std::shared_ptr values = NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100); + std::shared_ptr values = + NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100); std::shared_ptr
table = MakeSimpleTable(values, true); // Parquet 1.0 returns an int64_t column as there is no way to tell a Parquet 1.0 @@ -401,8 +404,8 @@ TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) { ::parquet::WriterProperties::Builder() .version(ParquetVersion::PARQUET_1_0) ->build(); - ASSERT_OK_NO_THROW( - WriteFlatTable(table.get(), ::arrow::default_memory_pool(), this->sink_, 512, properties)); + ASSERT_OK_NO_THROW(WriteFlatTable( + table.get(), ::arrow::default_memory_pool(), this->sink_, 512, properties)); std::shared_ptr expected_values; std::shared_ptr int64_data = @@ -481,8 +484,9 @@ class TestPrimitiveParquetIO : public TestParquetIO { } }; -typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, ::arrow::Int16Type, - ::arrow::UInt32Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType, +typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, + ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::UInt32Type, ::arrow::Int32Type, + ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType, ::arrow::DoubleType> PrimitiveTestTypes; TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes); diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc index 31a47223..3dfaf149 100644 --- a/src/parquet/arrow/arrow-schema-test.cc +++ b/src/parquet/arrow/arrow-schema-test.cc @@ -48,10 +48,12 @@ const auto INT64 = std::make_shared<::arrow::Int64Type>(); const auto FLOAT = std::make_shared<::arrow::FloatType>(); const auto DOUBLE = std::make_shared<::arrow::DoubleType>(); const auto UTF8 = std::make_shared<::arrow::StringType>(); -const auto TIMESTAMP_MS = std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI); +const auto TIMESTAMP_MS = + std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI); // TODO: This requires parquet-cpp implementing the MICROS enum value // const auto TIMESTAMP_US = std::make_shared(TimestampType::Unit::MICRO); -const auto BINARY = std::make_shared<::arrow::ListType>(std::make_shared("", UINT8)); +const auto BINARY = + std::make_shared<::arrow::ListType>(std::make_shared("", UINT8)); const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4); class TestConvertParquetSchema : public ::testing::Test { @@ -180,8 +182,7 @@ class TestConvertArrowSchema : public ::testing::Test { NodePtr schema_node = GroupNode::Make("schema", Repetition::REPEATED, nodes); const GroupNode* expected_schema_node = static_cast(schema_node.get()); - const GroupNode* result_schema_node = - result_schema_->group_node(); + const GroupNode* result_schema_node = result_schema_->group_node(); ASSERT_EQ(expected_schema_node->field_count(), result_schema_node->field_count()); diff --git a/src/parquet/arrow/io.h b/src/parquet/arrow/io.h index feab7c53..4585c7e5 100644 --- a/src/parquet/arrow/io.h +++ b/src/parquet/arrow/io.h @@ -32,7 +32,7 @@ namespace arrow { class MemoryPool; -} // namespace arrow +} // namespace arrow namespace parquet { diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h index 997fd7ac..e7257285 100644 --- a/src/parquet/arrow/reader.h +++ b/src/parquet/arrow/reader.h @@ -33,7 +33,6 @@ class MemoryPool; class RowBatch; class Status; class Table; - } namespace parquet { diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h index 0baed3cc..6917b905 100644 --- a/src/parquet/arrow/schema.h +++ b/src/parquet/arrow/schema.h @@ -20,13 +20,13 @@ #include +#include "arrow/schema.h" +#include "arrow/type.h" +#include "arrow/util/visibility.h" + #include "parquet/api/schema.h" #include "parquet/api/writer.h" -#include -#include -#include - namespace arrow { class Status; diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index d1358373..deac9f75 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -41,7 +41,8 @@ typename std::enable_if::value, NonNullArray(size_t size) { std::vector values; ::arrow::test::random_real(size, 0, 0, 1, &values); - ::arrow::NumericBuilder builder(::arrow::default_memory_pool(), std::make_shared()); + ::arrow::NumericBuilder builder( + ::arrow::default_memory_pool(), std::make_shared()); builder.Append(values.data(), values.size()); return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish()); } @@ -52,7 +53,8 @@ typename std::enable_if::value, NonNullArray(size_t size) { std::vector values; ::arrow::test::randint(size, 0, 64, &values); - ::arrow::NumericBuilder builder(::arrow::default_memory_pool(), std::make_shared()); + ::arrow::NumericBuilder builder( + ::arrow::default_memory_pool(), std::make_shared()); builder.Append(values.data(), values.size()); return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish()); } @@ -61,7 +63,8 @@ template typename std::enable_if::value, std::shared_ptr<::arrow::StringArray>>::type NonNullArray(size_t size) { - ::arrow::StringBuilder builder(::arrow::default_memory_pool(), std::make_shared<::arrow::StringType>()); + ::arrow::StringBuilder builder( + ::arrow::default_memory_pool(), std::make_shared<::arrow::StringType>()); for (size_t i = 0; i < size; i++) { builder.Append("test-string"); } @@ -72,7 +75,8 @@ template <> std::shared_ptr<::arrow::PrimitiveArray> NonNullArray<::arrow::BooleanType>(size_t size) { std::vector values; ::arrow::test::randint(size, 0, 1, &values); - ::arrow::BooleanBuilder builder(::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>()); + ::arrow::BooleanBuilder builder( + ::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>()); builder.Append(values.data(), values.size()); return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish()); } @@ -90,7 +94,8 @@ NullableArray(size_t size, size_t num_nulls) { valid_bytes[i * 2] = 0; } - ::arrow::NumericBuilder builder(::arrow::default_memory_pool(), std::make_shared()); + ::arrow::NumericBuilder builder( + ::arrow::default_memory_pool(), std::make_shared()); builder.Append(values.data(), values.size(), valid_bytes.data()); return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish()); } @@ -108,7 +113,8 @@ NullableArray(size_t size, size_t num_nulls) { valid_bytes[i * 2] = 0; } - ::arrow::NumericBuilder builder(::arrow::default_memory_pool(), std::make_shared()); + ::arrow::NumericBuilder builder( + ::arrow::default_memory_pool(), std::make_shared()); builder.Append(values.data(), values.size(), valid_bytes.data()); return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish()); } @@ -124,7 +130,8 @@ NullableArray(size_t size, size_t num_nulls) { valid_bytes[i * 2] = 0; } - ::arrow::StringBuilder builder(::arrow::default_memory_pool(), std::make_shared<::arrow::StringType>()); + ::arrow::StringBuilder builder( + ::arrow::default_memory_pool(), std::make_shared<::arrow::StringType>()); for (size_t i = 0; i < size; i++) { builder.Append("test-string"); } @@ -143,13 +150,14 @@ std::shared_ptr<::arrow::PrimitiveArray> NullableArray<::arrow::BooleanType>( valid_bytes[i * 2] = 0; } - ::arrow::BooleanBuilder builder(::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>()); + ::arrow::BooleanBuilder builder( + ::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>()); builder.Append(values.data(), values.size(), valid_bytes.data()); return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish()); } -std::shared_ptr<::arrow::Column> MakeColumn( - const std::string& name, const std::shared_ptr<::arrow::Array>& array, bool nullable) { +std::shared_ptr<::arrow::Column> MakeColumn(const std::string& name, + const std::shared_ptr<::arrow::Array>& array, bool nullable) { auto field = std::make_shared<::arrow::Field>(name, array->type(), nullable); return std::make_shared<::arrow::Column>(field, array); } @@ -182,7 +190,8 @@ void ExpectArray(typename ArrowType::c_type* expected, ::arrow::Array* result) { template <> void ExpectArray<::arrow::BooleanType>(uint8_t* expected, ::arrow::Array* result) { - ::arrow::BooleanBuilder builder(::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>()); + ::arrow::BooleanBuilder builder( + ::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>()); builder.Append(expected, result->length()); std::shared_ptr<::arrow::Array> expected_array = builder.Finish(); EXPECT_TRUE(result->Equals(expected_array)); diff --git a/src/parquet/arrow/utils.h b/src/parquet/arrow/utils.h index 8a085efa..b443c998 100644 --- a/src/parquet/arrow/utils.h +++ b/src/parquet/arrow/utils.h @@ -26,10 +26,12 @@ namespace parquet { namespace arrow { -#define PARQUET_CATCH_NOT_OK(s) \ - try { \ - (s); \ - } catch (const ::parquet::ParquetException& e) { return ::arrow::Status::Invalid(e.what()); } +#define PARQUET_CATCH_NOT_OK(s) \ + try { \ + (s); \ + } catch (const ::parquet::ParquetException& e) { \ + return ::arrow::Status::Invalid(e.what()); \ + } #define PARQUET_IGNORE_NOT_OK(s) \ try { \ diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index 5d79ea92..985b17e5 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -221,7 +221,7 @@ Status FileWriter::Impl::Close() { } #define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \ - case ::arrow::Type::ENUM: \ + case ::arrow::Type::ENUM: \ return TypedWriteBatch(writer, data, offset, length); \ break; diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h index 2008227c..92524d8c 100644 --- a/src/parquet/arrow/writer.h +++ b/src/parquet/arrow/writer.h @@ -32,7 +32,6 @@ class RowBatch; class Status; class StringArray; class Table; - } namespace parquet { @@ -49,7 +48,8 @@ class PARQUET_EXPORT FileWriter { FileWriter(::arrow::MemoryPool* pool, std::unique_ptr writer); ::arrow::Status NewRowGroup(int64_t chunk_size); - ::arrow::Status WriteFlatColumnChunk(const ::arrow::Array* data, int64_t offset = 0, int64_t length = -1); + ::arrow::Status WriteFlatColumnChunk( + const ::arrow::Array* data, int64_t offset = 0, int64_t length = -1); ::arrow::Status Close(); virtual ~FileWriter(); @@ -66,10 +66,10 @@ class PARQUET_EXPORT FileWriter { * * The table shall only consist of nullable, non-repeated columns of primitive type. */ -::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table, ::arrow::MemoryPool* pool, - const std::shared_ptr& sink, int64_t chunk_size, - const std::shared_ptr& properties = - default_writer_properties()); +::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table, + ::arrow::MemoryPool* pool, const std::shared_ptr& sink, + int64_t chunk_size, + const std::shared_ptr& properties = default_writer_properties()); } // namespace arrow From 1d39a60357d21bffdfe0403e8776c6d1a66bfdc1 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Tue, 13 Sep 2016 09:44:19 +0200 Subject: [PATCH 05/10] Import MemoryPool instead of declaring it --- src/parquet/arrow/io.h | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/parquet/arrow/io.h b/src/parquet/arrow/io.h index 4585c7e5..dc606358 100644 --- a/src/parquet/arrow/io.h +++ b/src/parquet/arrow/io.h @@ -26,13 +26,7 @@ #include "parquet/api/io.h" #include "arrow/io/interfaces.h" -#include "arrow/util/visibility.h" - -namespace arrow { - -class MemoryPool; - -} // namespace arrow +#include "arrow/util/memory-pool.h" namespace parquet { From 45de04407895ed14a3e0d0d2cde800bc3bb7e678 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Tue, 13 Sep 2016 09:45:56 +0200 Subject: [PATCH 06/10] Style fixes for IO --- src/parquet/arrow/arrow-io-test.cc | 16 ++++++++++------ src/parquet/arrow/io.cc | 5 ++--- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/parquet/arrow/arrow-io-test.cc b/src/parquet/arrow/arrow-io-test.cc index 83a8b9bb..377fb977 100644 --- a/src/parquet/arrow/arrow-io-test.cc +++ b/src/parquet/arrow/arrow-io-test.cc @@ -29,9 +29,13 @@ #include "parquet/api/io.h" #include "parquet/arrow/io.h" +using arrow::default_memory_pool; using arrow::MemoryPool; using arrow::Status; +// To assist with readability +using ArrowROFile = arrow::io::RandomAccessFile; + namespace parquet { namespace arrow { @@ -54,7 +58,7 @@ TEST(TestParquetAllocator, DefaultCtor) { // Pass through to the default memory pool class TrackingPool : public MemoryPool { public: - TrackingPool() : pool_(::arrow::default_memory_pool()), bytes_allocated_(0) {} + TrackingPool() : pool_(default_memory_pool()), bytes_allocated_(0) {} Status Allocate(int64_t size, uint8_t** out) override { RETURN_NOT_OK(pool_->Allocate(size, out)); @@ -99,7 +103,7 @@ TEST(TestParquetAllocator, CustomPool) { // ---------------------------------------------------------------------- // Read source tests -class BufferReader : public ::arrow::io::RandomAccessFile { +class BufferReader : public ArrowROFile { public: BufferReader(const uint8_t* buffer, int buffer_size) : buffer_(buffer), buffer_size_(buffer_size), position_(0) {} @@ -151,7 +155,7 @@ TEST(TestParquetReadSource, Basics) { std::string data = "this is the data"; auto data_buffer = reinterpret_cast(data.c_str()); - ParquetAllocator allocator(::arrow::default_memory_pool()); + ParquetAllocator allocator(default_memory_pool()); auto file = std::make_shared(data_buffer, data.size()); auto source = std::make_shared(&allocator); @@ -164,7 +168,7 @@ TEST(TestParquetReadSource, Basics) { ASSERT_NO_THROW(source->Seek(0)); // Seek out of bounds - ASSERT_THROW(source->Seek(100), ::parquet::ParquetException); + ASSERT_THROW(source->Seek(100), ParquetException); uint8_t buffer[50]; @@ -172,11 +176,11 @@ TEST(TestParquetReadSource, Basics) { ASSERT_EQ(0, std::memcmp(buffer, "this", 4)); ASSERT_EQ(4, source->Tell()); - std::shared_ptr<::parquet::Buffer> pq_buffer; + std::shared_ptr pq_buffer; ASSERT_NO_THROW(pq_buffer = source->Read(7)); - auto expected_buffer = std::make_shared<::parquet::Buffer>(data_buffer + 4, 7); + auto expected_buffer = std::make_shared(data_buffer + 4, 7); ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get())); } diff --git a/src/parquet/arrow/io.cc b/src/parquet/arrow/io.cc index 3de8eb11..8e2645ab 100644 --- a/src/parquet/arrow/io.cc +++ b/src/parquet/arrow/io.cc @@ -23,7 +23,6 @@ #include "parquet/api/io.h" #include "parquet/arrow/utils.h" -#include "arrow/util/memory-pool.h" #include "arrow/util/status.h" using arrow::Status; @@ -93,10 +92,10 @@ int64_t ParquetReadSource::Read(int64_t nbytes, uint8_t* out) { return bytes_read; } -std::shared_ptr<::parquet::Buffer> ParquetReadSource::Read(int64_t nbytes) { +std::shared_ptr ParquetReadSource::Read(int64_t nbytes) { // TODO(wesm): This code is duplicated from parquet/util/input.cc; suggests // that there should be more code sharing amongst file-like sources - auto result = std::make_shared<::parquet::OwnedMutableBuffer>(0, allocator_); + auto result = std::make_shared(0, allocator_); result->Resize(nbytes); int64_t bytes_read = Read(nbytes, result->mutable_data()); From 3f3e24b32fb73b87acefd8c0218b1e4d934a8680 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Fri, 16 Sep 2016 11:36:41 +0200 Subject: [PATCH 07/10] Fix templating problem --- src/parquet/arrow/arrow-reader-writer-test.cc | 10 ---------- src/parquet/arrow/writer.cc | 2 +- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index f993f547..e4e9efa1 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -40,16 +40,6 @@ using arrow::Status; using arrow::Table; using ParquetBuffer = parquet::Buffer; -using parquet::BufferReader; -using parquet::default_writer_properties; -using parquet::InMemoryOutputStream; -using parquet::LogicalType; -using parquet::ParquetFileReader; -using parquet::ParquetFileWriter; -using parquet::RandomAccessSource; -using parquet::Repetition; -using parquet::SchemaDescriptor; -using parquet::ParquetVersion; using ParquetType = parquet::Type; using parquet::schema::GroupNode; using parquet::schema::NodePtr; diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index 985b17e5..6a56bb34 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -167,7 +167,7 @@ Status FileWriter::Impl::TypedWriteBatch(::parquet::ColumnWriter* column_writer, // * Arrow data is stored bitwise thus we cannot use std::copy to transform from // ArrowType::c_type to ParquetType::c_type template <> -Status FileWriter::Impl::TypedWriteBatch<::parquet::BooleanType, BooleanType>( +Status FileWriter::Impl::TypedWriteBatch<::parquet::BooleanType, ::arrow::BooleanType>( ::parquet::ColumnWriter* column_writer, const PrimitiveArray* data, int64_t offset, int64_t length) { DCHECK((offset + length) <= data->length()); From fc2c316c4eb383ae493589bb1de6ea08405e4a53 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Fri, 16 Sep 2016 12:58:11 +0000 Subject: [PATCH 08/10] Style fixes --- src/parquet/arrow/reader.cc | 34 +++++++++++++----------------- src/parquet/arrow/writer.cc | 42 ++++++++++++++++++------------------- 2 files changed, 36 insertions(+), 40 deletions(-) diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index 660938dd..056b5abd 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -41,10 +41,6 @@ using arrow::PoolBuffer; using arrow::Status; using arrow::Table; -using parquet::ColumnReader; -using parquet::Repetition; -using parquet::TypedColumnReader; - // Help reduce verbosity using ParquetRAS = parquet::RandomAccessSource; using ParquetReader = parquet::ParquetFileReader; @@ -67,23 +63,23 @@ using BuilderType = typename ArrowTypeTraits::builder_type; class FileReader::Impl { public: - Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader); + Impl(MemoryPool* pool, std::unique_ptr reader); virtual ~Impl() {} - bool CheckForFlatColumn(const ::parquet::ColumnDescriptor* descr); + bool CheckForFlatColumn(const ColumnDescriptor* descr); Status GetFlatColumn(int i, std::unique_ptr* out); Status ReadFlatColumn(int i, std::shared_ptr* out); Status ReadFlatTable(std::shared_ptr
* out); private: MemoryPool* pool_; - std::unique_ptr<::parquet::ParquetFileReader> reader_; + std::unique_ptr reader_; }; class FlatColumnReader::Impl { public: - Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr, - ::parquet::ParquetFileReader* reader, int column_index); + Impl(MemoryPool* pool, const ColumnDescriptor* descr, + ParquetFileReader* reader, int column_index); virtual ~Impl() {} Status NextBatch(int batch_size, std::shared_ptr* out); @@ -130,8 +126,8 @@ class FlatColumnReader::Impl { } MemoryPool* pool_; - const ::parquet::ColumnDescriptor* descr_; - ::parquet::ParquetFileReader* reader_; + const ColumnDescriptor* descr_; + ParquetFileReader* reader_; int column_index_; int next_row_group_; std::shared_ptr column_reader_; @@ -144,10 +140,10 @@ class FlatColumnReader::Impl { }; FileReader::Impl::Impl( - MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader) + MemoryPool* pool, std::unique_ptr reader) : pool_(pool), reader_(std::move(reader)) {} -bool FileReader::Impl::CheckForFlatColumn(const ::parquet::ColumnDescriptor* descr) { +bool FileReader::Impl::CheckForFlatColumn(const ColumnDescriptor* descr) { if ((descr->max_repetition_level() > 0) || (descr->max_definition_level() > 1)) { return false; } else if ((descr->max_definition_level() == 1) && @@ -196,7 +192,7 @@ Status FileReader::Impl::ReadFlatTable(std::shared_ptr
* table) { } FileReader::FileReader( - MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader) + MemoryPool* pool, std::unique_ptr reader) : impl_(new FileReader::Impl(pool, std::move(reader))) {} FileReader::~FileReader() {} @@ -228,8 +224,8 @@ Status FileReader::ReadFlatTable(std::shared_ptr
* out) { return impl_->ReadFlatTable(out); } -FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr, - ::parquet::ParquetFileReader* reader, int column_index) +FlatColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor* descr, + ParquetFileReader* reader, int column_index) : pool_(pool), descr_(descr), reader_(reader), @@ -320,16 +316,16 @@ Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType int values_to_read = batch_size; ::arrow::StringBuilder builder(pool_, field_->type); while ((values_to_read > 0) && column_reader_) { - values_buffer_.Resize(values_to_read * sizeof(::parquet::ByteArray)); + values_buffer_.Resize(values_to_read * sizeof(ByteArray)); if (descr_->max_definition_level() > 0) { def_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); } auto reader = - dynamic_cast*>(column_reader_.get()); + dynamic_cast*>(column_reader_.get()); int64_t values_read; int64_t levels_read; int16_t* def_levels = reinterpret_cast(def_levels_buffer_.mutable_data()); - auto values = reinterpret_cast<::parquet::ByteArray*>(values_buffer_.mutable_data()); + auto values = reinterpret_cast(values_buffer_.mutable_data()); PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch( values_to_read, def_levels, nullptr, values, &values_read)); values_to_read -= levels_read; diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index 6a56bb34..5b5f41fc 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -47,11 +47,11 @@ namespace arrow { class FileWriter::Impl { public: - Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer); + Impl(MemoryPool* pool, std::unique_ptr writer); Status NewRowGroup(int64_t chunk_size); template - Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data, + Status TypedWriteBatch(ColumnWriter* writer, const PrimitiveArray* data, int64_t offset, int64_t length); // TODO(uwe): Same code as in reader.cc the only difference is the name of the temporary @@ -96,12 +96,12 @@ class FileWriter::Impl { // as expected by parquet-cpp. PoolBuffer data_buffer_; PoolBuffer def_levels_buffer_; - std::unique_ptr<::parquet::ParquetFileWriter> writer_; - ::parquet::RowGroupWriter* row_group_writer_; + std::unique_ptr writer_; + RowGroupWriter* row_group_writer_; }; FileWriter::Impl::Impl( - MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer) + MemoryPool* pool, std::unique_ptr writer) : pool_(pool), data_buffer_(pool), writer_(std::move(writer)), @@ -114,7 +114,7 @@ Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) { } template -Status FileWriter::Impl::TypedWriteBatch(::parquet::ColumnWriter* column_writer, +Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer, const PrimitiveArray* data, int64_t offset, int64_t length) { using ArrowCType = typename ArrowType::c_type; using ParquetCType = typename ParquetType::c_type; @@ -122,7 +122,7 @@ Status FileWriter::Impl::TypedWriteBatch(::parquet::ColumnWriter* column_writer, DCHECK((offset + length) <= data->length()); auto data_ptr = reinterpret_cast(data->data()->data()) + offset; auto writer = - reinterpret_cast<::parquet::TypedColumnWriter*>(column_writer); + reinterpret_cast*>(column_writer); if (writer->descr()->max_definition_level() == 0) { // no nulls, just dump the data const ParquetCType* data_writer_ptr = nullptr; @@ -167,14 +167,14 @@ Status FileWriter::Impl::TypedWriteBatch(::parquet::ColumnWriter* column_writer, // * Arrow data is stored bitwise thus we cannot use std::copy to transform from // ArrowType::c_type to ParquetType::c_type template <> -Status FileWriter::Impl::TypedWriteBatch<::parquet::BooleanType, ::arrow::BooleanType>( - ::parquet::ColumnWriter* column_writer, const PrimitiveArray* data, int64_t offset, +Status FileWriter::Impl::TypedWriteBatch( + ColumnWriter* column_writer, const PrimitiveArray* data, int64_t offset, int64_t length) { DCHECK((offset + length) <= data->length()); RETURN_NOT_OK(data_buffer_.Resize(length)); auto data_ptr = reinterpret_cast(data->data()->data()); auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); - auto writer = reinterpret_cast<::parquet::TypedColumnWriter<::parquet::BooleanType>*>( + auto writer = reinterpret_cast*>( column_writer); if (writer->descr()->max_definition_level() == 0) { // no nulls, just dump the data @@ -227,7 +227,7 @@ Status FileWriter::Impl::Close() { Status FileWriter::Impl::WriteFlatColumnChunk( const PrimitiveArray* data, int64_t offset, int64_t length) { - ::parquet::ColumnWriter* writer; + ColumnWriter* writer; PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn()); switch (data->type_enum()) { TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType) @@ -258,15 +258,15 @@ Status FileWriter::Impl::WriteFlatColumnChunk( Status FileWriter::Impl::WriteFlatColumnChunk( const StringArray* data, int64_t offset, int64_t length) { - ::parquet::ColumnWriter* column_writer; + ColumnWriter* column_writer; PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn()); DCHECK((offset + length) <= data->length()); - RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(::parquet::ByteArray))); - auto buffer_ptr = reinterpret_cast<::parquet::ByteArray*>(data_buffer_.mutable_data()); + RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ByteArray))); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); auto values = std::dynamic_pointer_cast(data->values()); auto data_ptr = reinterpret_cast(values->data()->data()); DCHECK(values != nullptr); - auto writer = reinterpret_cast<::parquet::TypedColumnWriter<::parquet::ByteArrayType>*>( + auto writer = reinterpret_cast*>( column_writer); if (writer->descr()->max_definition_level() > 0) { RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t))); @@ -275,7 +275,7 @@ Status FileWriter::Impl::WriteFlatColumnChunk( if (writer->descr()->max_definition_level() == 0 || data->null_count() == 0) { // no nulls, just dump the data for (int64_t i = 0; i < length; i++) { - buffer_ptr[i] = ::parquet::ByteArray( + buffer_ptr[i] = ByteArray( data->value_length(i + offset), data_ptr + data->value_offset(i)); } if (writer->descr()->max_definition_level() > 0) { @@ -289,7 +289,7 @@ Status FileWriter::Impl::WriteFlatColumnChunk( def_levels_ptr[i] = 0; } else { def_levels_ptr[i] = 1; - buffer_ptr[buffer_idx++] = ::parquet::ByteArray( + buffer_ptr[buffer_idx++] = ByteArray( data->value_length(i + offset), data_ptr + data->value_offset(i + offset)); } } @@ -302,7 +302,7 @@ Status FileWriter::Impl::WriteFlatColumnChunk( } FileWriter::FileWriter( - MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer) + MemoryPool* pool, std::unique_ptr writer) : impl_(new FileWriter::Impl(pool, std::move(writer))) {} Status FileWriter::NewRowGroup(int64_t chunk_size) { @@ -337,9 +337,9 @@ MemoryPool* FileWriter::memory_pool() const { FileWriter::~FileWriter() {} Status WriteFlatTable(const Table* table, MemoryPool* pool, - const std::shared_ptr<::parquet::OutputStream>& sink, int64_t chunk_size, - const std::shared_ptr<::parquet::WriterProperties>& properties) { - std::shared_ptr<::parquet::SchemaDescriptor> parquet_schema; + const std::shared_ptr& sink, int64_t chunk_size, + const std::shared_ptr& properties) { + std::shared_ptr parquet_schema; RETURN_NOT_OK( ToParquetSchema(table->schema().get(), *properties.get(), &parquet_schema)); auto schema_node = std::static_pointer_cast(parquet_schema->schema_root()); From 62f0f88734e3ea6e66501e3554aa72335eda7f5d Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Sun, 18 Sep 2016 17:02:10 +0000 Subject: [PATCH 09/10] Add static linkage --- CMakeLists.txt | 8 ++-- src/parquet/arrow/CMakeLists.txt | 68 +++++++++++++++++++++----------- 2 files changed, 49 insertions(+), 27 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d085da95..42b10eeb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -610,10 +610,10 @@ if (PARQUET_ARROW) set_target_properties(arrow PROPERTIES IMPORTED_LOCATION ${ARROW_SHARED_LIB}) add_library(arrow_io SHARED IMPORTED) set_target_properties(arrow_io PROPERTIES IMPORTED_LOCATION ${ARROW_IO_SHARED_LIB}) - #add_library(arrow_static STAIC IMPORTED) - #set_target_properties(arrow_static PROPERTIES IMPORTED_LOCATION ${ARROW_STATIC_LIB}) - #add_library(arrow_io_static STATIC IMPORTED) - #set_target_properties(arrow_io_static PROPERTIES IMPORTED_LOCATION ${ARROW_IO_STATIC_LIB}) + add_library(arrow_static STATIC IMPORTED) + set_target_properties(arrow_static PROPERTIES IMPORTED_LOCATION ${ARROW_STATIC_LIB}) + add_library(arrow_io_static STATIC IMPORTED) + set_target_properties(arrow_io_static PROPERTIES IMPORTED_LOCATION ${ARROW_IO_STATIC_LIB}) add_subdirectory(src/parquet/arrow) endif() diff --git a/src/parquet/arrow/CMakeLists.txt b/src/parquet/arrow/CMakeLists.txt index 3733d031..5d923a7c 100644 --- a/src/parquet/arrow/CMakeLists.txt +++ b/src/parquet/arrow/CMakeLists.txt @@ -25,33 +25,59 @@ set(PARQUET_ARROW_SRCS writer.cc ) -set(PARQUET_ARROW_LIBS - arrow - arrow_io - parquet_shared -) - -add_library(parquet_arrow SHARED +add_library(parquet_arrow_objlib OBJECT ${PARQUET_ARROW_SRCS} ) -target_link_libraries(parquet_arrow ${PARQUET_ARROW_LIBS}) -SET_TARGET_PROPERTIES(parquet_arrow PROPERTIES LINKER_LANGUAGE CXX) -if (APPLE) - set_target_properties(parquet_arrow - PROPERTIES - BUILD_WITH_INSTALL_RPATH ON - INSTALL_NAME_DIR "@rpath") +# SET_TARGET_PROPERTIES(parquet_arrow PROPERTIES LINKER_LANGUAGE CXX) + +if (PARQUET_BUILD_SHARED) + add_library(parquet_arrow_shared SHARED $) + set_target_properties(parquet_arrow_shared + PROPERTIES + LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}" + LINK_FLAGS "${SHARED_LINK_FLAGS}" + OUTPUT_NAME "parquet_arrow") + target_link_libraries(parquet_arrow_shared + arrow + arrow_io + parquet_shared) + if (APPLE) + set_target_properties(parquet_arrow_shared + PROPERTIES + BUILD_WITH_INSTALL_RPATH ON + INSTALL_NAME_DIR "@rpath") + endif() endif() -ADD_PARQUET_TEST(arrow-schema-test) -ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow arrow) +if (PARQUET_BUILD_STATIC) + add_library(parquet_arrow_static STATIC $) + set_target_properties(parquet_arrow_static + PROPERTIES + LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}" + OUTPUT_NAME "parquet_arrow") + target_link_libraries(parquet_arrow_static + arrow_static + arrow_static + parquet_static) + install(TARGETS parquet_arrow_static + ARCHIVE DESTINATION lib + LIBRARY DESTINATION lib) +endif() +ADD_PARQUET_TEST(arrow-schema-test) ADD_PARQUET_TEST(arrow-io-test) -ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow) - ADD_PARQUET_TEST(arrow-reader-writer-test) -ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow) + +if (PARQUET_BUILD_STATIC) + ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow_static) + ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow_static) + ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow_static) +else() + ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow_shared) + ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow_shared) + ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow_shared) +endif() # Headers: top level install(FILES @@ -62,7 +88,3 @@ install(FILES writer.h DESTINATION include/parquet/arrow) -install(TARGETS parquet_arrow - LIBRARY DESTINATION lib - ARCHIVE DESTINATION lib) - From e55ab1f63e4ebe4c82e60719beae1d1e87fc6e72 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Sun, 18 Sep 2016 17:19:21 +0000 Subject: [PATCH 10/10] verbose ctest output --- ci/travis_script_cpp.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ci/travis_script_cpp.sh b/ci/travis_script_cpp.sh index 2e7dcdd1..8794559c 100755 --- a/ci/travis_script_cpp.sh +++ b/ci/travis_script_cpp.sh @@ -16,13 +16,13 @@ make lint if [ $TRAVIS_OS_NAME == "linux" ]; then make -j4 || exit 1 - ctest -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; } + ctest -VV -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; } sudo pip install cpp_coveralls export PARQUET_ROOT=$TRAVIS_BUILD_DIR $TRAVIS_BUILD_DIR/ci/upload_coverage.sh else make -j4 || exit 1 - ctest -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; } + ctest -VV -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; } fi popd