diff --git a/.travis.yml b/.travis.yml index 780d9f9b..6dc994e7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,6 +9,8 @@ addons: - g++-4.9 - valgrind - libboost-dev + - libboost-filesystem-dev + - libboost-system-dev - libboost-program-options-dev - libboost-test-dev - libssl-dev @@ -26,7 +28,7 @@ matrix: before_script: - source $TRAVIS_BUILD_DIR/ci/before_script_travis.sh - cmake -DCMAKE_CXX_FLAGS="-Werror" -DPARQUET_TEST_MEMCHECK=ON -DPARQUET_BUILD_BENCHMARKS=ON - -DPARQUET_GENERATE_COVERAGE=1 $TRAVIS_BUILD_DIR + -DPARQUET_ARROW=ON -DPARQUET_GENERATE_COVERAGE=1 $TRAVIS_BUILD_DIR - export PARQUET_TEST_DATA=$TRAVIS_BUILD_DIR/data - compiler: clang os: linux @@ -76,7 +78,7 @@ before_install: before_script: - source $TRAVIS_BUILD_DIR/ci/before_script_travis.sh -- cmake -DCMAKE_CXX_FLAGS="-Werror" $TRAVIS_BUILD_DIR +- cmake -DCMAKE_CXX_FLAGS="-Werror" -DPARQUET_ARROW=ON $TRAVIS_BUILD_DIR - export PARQUET_TEST_DATA=$TRAVIS_BUILD_DIR/data script: diff --git a/CMakeLists.txt b/CMakeLists.txt index 3878056e..42b10eeb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -82,6 +82,9 @@ if ("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") option(PARQUET_BUILD_EXECUTABLES "Build the libparquet executable CLI tools" ON) + option(PARQUET_ARROW + "Build the Arrow support" + OFF) endif() # If build in-source, create the latest symlink. If build out-of-source, which is @@ -247,6 +250,16 @@ function(ADD_PARQUET_TEST_DEPENDENCIES REL_TEST_NAME) add_dependencies(${TEST_NAME} ${ARGN}) endfunction() +# A wrapper for add_dependencies() that is compatible with PARQUET_BUILD_TESTS. +function(ADD_PARQUET_LINK_LIBRARIES REL_TEST_NAME) + if(NOT PARQUET_BUILD_TESTS) + return() + endif() + get_filename_component(TEST_NAME ${REL_TEST_NAME} NAME_WE) + + target_link_libraries(${TEST_NAME} ${ARGN}) +endfunction() + enable_testing() ############################################################ @@ -553,6 +566,12 @@ if (PARQUET_BUILD_SHARED) target_link_libraries(parquet_shared LINK_PUBLIC ${LIBPARQUET_LINK_LIBS} LINK_PRIVATE ${LIBPARQUET_PRIVATE_LINK_LIBS}) + if (APPLE) + set_target_properties(parquet_shared + PROPERTIES + BUILD_WITH_INSTALL_RPATH ON + INSTALL_NAME_DIR "@rpath") + endif() endif() if (PARQUET_BUILD_STATIC) @@ -583,6 +602,22 @@ add_dependencies(parquet_objlib parquet_thrift) add_subdirectory(benchmarks) add_subdirectory(tools) +# Arrow +if (PARQUET_ARROW) + find_package(Arrow REQUIRED) + include_directories(SYSTEM ${ARROW_INCLUDE_DIR}) + add_library(arrow SHARED IMPORTED) + set_target_properties(arrow PROPERTIES IMPORTED_LOCATION ${ARROW_SHARED_LIB}) + add_library(arrow_io SHARED IMPORTED) + set_target_properties(arrow_io PROPERTIES IMPORTED_LOCATION ${ARROW_IO_SHARED_LIB}) + add_library(arrow_static STATIC IMPORTED) + set_target_properties(arrow_static PROPERTIES IMPORTED_LOCATION ${ARROW_STATIC_LIB}) + add_library(arrow_io_static STATIC IMPORTED) + set_target_properties(arrow_io_static PROPERTIES IMPORTED_LOCATION ${ARROW_IO_STATIC_LIB}) + + add_subdirectory(src/parquet/arrow) +endif() + add_custom_target(clean-all COMMAND ${CMAKE_BUILD_TOOL} clean COMMAND ${CMAKE_COMMAND} -P cmake_modules/clean-all.cmake diff --git a/ci/travis_script_cpp.sh b/ci/travis_script_cpp.sh index 2e7dcdd1..8794559c 100755 --- a/ci/travis_script_cpp.sh +++ b/ci/travis_script_cpp.sh @@ -16,13 +16,13 @@ make lint if [ $TRAVIS_OS_NAME == "linux" ]; then make -j4 || exit 1 - ctest -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; } + ctest -VV -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; } sudo pip install cpp_coveralls export PARQUET_ROOT=$TRAVIS_BUILD_DIR $TRAVIS_BUILD_DIR/ci/upload_coverage.sh else make -j4 || exit 1 - ctest -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; } + ctest -VV -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; } fi popd diff --git a/cmake_modules/FindArrow.cmake b/cmake_modules/FindArrow.cmake new file mode 100644 index 00000000..91d0e714 --- /dev/null +++ b/cmake_modules/FindArrow.cmake @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# - Find ARROW (arrow/api.h, libarrow.a, libarrow.so) +# This module defines +# ARROW_INCLUDE_DIR, directory containing headers +# ARROW_LIBS, directory containing arrow libraries +# ARROW_STATIC_LIB, path to libarrow.a +# ARROW_SHARED_LIB, path to libarrow's shared library +# ARROW_FOUND, whether arrow has been found + +set(ARROW_SEARCH_HEADER_PATHS + $ENV{ARROW_HOME}/include +) + +set(ARROW_SEARCH_LIB_PATH + $ENV{ARROW_HOME}/lib +) + +find_path(ARROW_INCLUDE_DIR arrow/array.h PATHS + ${ARROW_SEARCH_HEADER_PATHS} + # make sure we don't accidentally pick up a different version + NO_DEFAULT_PATH +) + +find_library(ARROW_LIB_PATH NAMES arrow + PATHS + ${ARROW_SEARCH_LIB_PATH} + NO_DEFAULT_PATH) + +find_library(ARROW_IO_LIB_PATH NAMES arrow_io + PATHS + ${ARROW_SEARCH_LIB_PATH} + NO_DEFAULT_PATH) + +if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH) + set(ARROW_FOUND TRUE) + set(ARROW_LIB_NAME libarrow) + set(ARROW_IO_LIB_NAME libarrow_io) + + set(ARROW_LIBS ${ARROW_SEARCH_LIB_PATH}) + set(ARROW_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_LIB_NAME}.a) + set(ARROW_SHARED_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + + set(ARROW_IO_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_IO_LIB_NAME}.a) + set(ARROW_IO_SHARED_LIB ${ARROW_LIBS}/${ARROW_IO_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + if (NOT Arrow_FIND_QUIETLY) + message(STATUS "Found the Arrow core library: ${ARROW_LIB_PATH}") + message(STATUS "Found the Arrow IO library: ${ARROW_IO_LIB_PATH}") + endif () +else () + if (NOT Arrow_FIND_QUIETLY) + set(ARROW_ERR_MSG "Could not find the Arrow library. Looked for headers") + set(ARROW_ERR_MSG "${ARROW_ERR_MSG} in ${ARROW_SEARCH_HEADER_PATHS}, and for libs") + set(ARROW_ERR_MSG "${ARROW_ERR_MSG} in ${ARROW_SEARCH_LIB_PATH}") + if (Arrow_FIND_REQUIRED) + message(FATAL_ERROR "${ARROW_ERR_MSG}") + else (Arrow_FIND_REQUIRED) + message(STATUS "${ARROW_ERR_MSG}") + endif (Arrow_FIND_REQUIRED) + endif () + set(ARROW_FOUND FALSE) +endif () + +mark_as_advanced( + ARROW_FOUND + ARROW_INCLUDE_DIR + ARROW_LIBS + ARROW_STATIC_LIB + ARROW_SHARED_LIB + ARROW_IO_STATIC_LIB + ARROW_IO_SHARED_LIB +) diff --git a/conda.recipe/build.sh b/conda.recipe/build.sh index 7fe8f91b..f77475c9 100644 --- a/conda.recipe/build.sh +++ b/conda.recipe/build.sh @@ -53,6 +53,7 @@ cmake \ -DCMAKE_BUILD_TYPE=debug \ -DCMAKE_INSTALL_PREFIX=$PREFIX \ -DPARQUET_BUILD_BENCHMARKS=off \ + -DPARQUET_ARROW=ON \ .. make diff --git a/src/parquet/arrow/CMakeLists.txt b/src/parquet/arrow/CMakeLists.txt new file mode 100644 index 00000000..5d923a7c --- /dev/null +++ b/src/parquet/arrow/CMakeLists.txt @@ -0,0 +1,90 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# ---------------------------------------------------------------------- +# parquet_arrow : Arrow <-> Parquet adapter + +set(PARQUET_ARROW_SRCS + io.cc + reader.cc + schema.cc + writer.cc +) + +add_library(parquet_arrow_objlib OBJECT + ${PARQUET_ARROW_SRCS} +) + +# SET_TARGET_PROPERTIES(parquet_arrow PROPERTIES LINKER_LANGUAGE CXX) + +if (PARQUET_BUILD_SHARED) + add_library(parquet_arrow_shared SHARED $) + set_target_properties(parquet_arrow_shared + PROPERTIES + LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}" + LINK_FLAGS "${SHARED_LINK_FLAGS}" + OUTPUT_NAME "parquet_arrow") + target_link_libraries(parquet_arrow_shared + arrow + arrow_io + parquet_shared) + if (APPLE) + set_target_properties(parquet_arrow_shared + PROPERTIES + BUILD_WITH_INSTALL_RPATH ON + INSTALL_NAME_DIR "@rpath") + endif() +endif() + +if (PARQUET_BUILD_STATIC) + add_library(parquet_arrow_static STATIC $) + set_target_properties(parquet_arrow_static + PROPERTIES + LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}" + OUTPUT_NAME "parquet_arrow") + target_link_libraries(parquet_arrow_static + arrow_static + arrow_static + parquet_static) + install(TARGETS parquet_arrow_static + ARCHIVE DESTINATION lib + LIBRARY DESTINATION lib) +endif() + +ADD_PARQUET_TEST(arrow-schema-test) +ADD_PARQUET_TEST(arrow-io-test) +ADD_PARQUET_TEST(arrow-reader-writer-test) + +if (PARQUET_BUILD_STATIC) + ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow_static) + ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow_static) + ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow_static) +else() + ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow_shared) + ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow_shared) + ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow_shared) +endif() + +# Headers: top level +install(FILES + io.h + reader.h + schema.h + utils.h + writer.h + DESTINATION include/parquet/arrow) + diff --git a/src/parquet/arrow/arrow-io-test.cc b/src/parquet/arrow/arrow-io-test.cc new file mode 100644 index 00000000..377fb977 --- /dev/null +++ b/src/parquet/arrow/arrow-io-test.cc @@ -0,0 +1,189 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include "gtest/gtest.h" + +#include "arrow/test-util.h" +#include "arrow/util/memory-pool.h" +#include "arrow/util/status.h" + +#include "parquet/api/io.h" +#include "parquet/arrow/io.h" + +using arrow::default_memory_pool; +using arrow::MemoryPool; +using arrow::Status; + +// To assist with readability +using ArrowROFile = arrow::io::RandomAccessFile; + +namespace parquet { +namespace arrow { + +// Allocator tests + +TEST(TestParquetAllocator, DefaultCtor) { + ParquetAllocator allocator; + + const int buffer_size = 10; + + uint8_t* buffer = nullptr; + ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size);); + + // valgrind will complain if we write into nullptr + memset(buffer, 0, buffer_size); + + allocator.Free(buffer, buffer_size); +} + +// Pass through to the default memory pool +class TrackingPool : public MemoryPool { + public: + TrackingPool() : pool_(default_memory_pool()), bytes_allocated_(0) {} + + Status Allocate(int64_t size, uint8_t** out) override { + RETURN_NOT_OK(pool_->Allocate(size, out)); + bytes_allocated_ += size; + return Status::OK(); + } + + void Free(uint8_t* buffer, int64_t size) override { + pool_->Free(buffer, size); + bytes_allocated_ -= size; + } + + int64_t bytes_allocated() const override { return bytes_allocated_; } + + private: + MemoryPool* pool_; + int64_t bytes_allocated_; +}; + +TEST(TestParquetAllocator, CustomPool) { + TrackingPool pool; + + ParquetAllocator allocator(&pool); + + ASSERT_EQ(&pool, allocator.pool()); + + const int buffer_size = 10; + + uint8_t* buffer = nullptr; + ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size);); + + ASSERT_EQ(buffer_size, pool.bytes_allocated()); + + // valgrind will complain if we write into nullptr + memset(buffer, 0, buffer_size); + + allocator.Free(buffer, buffer_size); + + ASSERT_EQ(0, pool.bytes_allocated()); +} + +// ---------------------------------------------------------------------- +// Read source tests + +class BufferReader : public ArrowROFile { + public: + BufferReader(const uint8_t* buffer, int buffer_size) + : buffer_(buffer), buffer_size_(buffer_size), position_(0) {} + + Status Close() override { + // no-op + return Status::OK(); + } + + Status Tell(int64_t* position) override { + *position = position_; + return Status::OK(); + } + + Status ReadAt( + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override { + RETURN_NOT_OK(Seek(position)); + return Read(nbytes, bytes_read, buffer); + } + + Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override { + memcpy(buffer, buffer_ + position_, nbytes); + *bytes_read = std::min(nbytes, buffer_size_ - position_); + position_ += *bytes_read; + return Status::OK(); + } + + Status GetSize(int64_t* size) override { + *size = buffer_size_; + return Status::OK(); + } + + Status Seek(int64_t position) override { + if (position < 0 || position >= buffer_size_) { + return Status::IOError("position out of bounds"); + } + + position_ = position; + return Status::OK(); + } + + private: + const uint8_t* buffer_; + int buffer_size_; + int64_t position_; +}; + +TEST(TestParquetReadSource, Basics) { + std::string data = "this is the data"; + auto data_buffer = reinterpret_cast(data.c_str()); + + ParquetAllocator allocator(default_memory_pool()); + + auto file = std::make_shared(data_buffer, data.size()); + auto source = std::make_shared(&allocator); + + ASSERT_OK(source->Open(file)); + + ASSERT_EQ(0, source->Tell()); + ASSERT_NO_THROW(source->Seek(5)); + ASSERT_EQ(5, source->Tell()); + ASSERT_NO_THROW(source->Seek(0)); + + // Seek out of bounds + ASSERT_THROW(source->Seek(100), ParquetException); + + uint8_t buffer[50]; + + ASSERT_NO_THROW(source->Read(4, buffer)); + ASSERT_EQ(0, std::memcmp(buffer, "this", 4)); + ASSERT_EQ(4, source->Tell()); + + std::shared_ptr pq_buffer; + + ASSERT_NO_THROW(pq_buffer = source->Read(7)); + + auto expected_buffer = std::make_shared(data_buffer + 4, 7); + + ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get())); +} + +} // namespace arrow +} // namespace parquet diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc new file mode 100644 index 00000000..e4e9efa1 --- /dev/null +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -0,0 +1,502 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gtest/gtest.h" + +#include "parquet/api/reader.h" +#include "parquet/api/writer.h" + +#include "parquet/arrow/test-util.h" +#include "parquet/arrow/reader.h" +#include "parquet/arrow/writer.h" + +#include "arrow/test-util.h" +#include "arrow/types/construct.h" +#include "arrow/types/primitive.h" +#include "arrow/types/string.h" +#include "arrow/util/memory-pool.h" +#include "arrow/util/status.h" + +using arrow::Array; +using arrow::ChunkedArray; +using arrow::default_memory_pool; +using arrow::PoolBuffer; +using arrow::PrimitiveArray; +using arrow::Status; +using arrow::Table; + +using ParquetBuffer = parquet::Buffer; +using ParquetType = parquet::Type; +using parquet::schema::GroupNode; +using parquet::schema::NodePtr; +using parquet::schema::PrimitiveNode; + +namespace parquet { + +namespace arrow { + +const int SMALL_SIZE = 100; +const int LARGE_SIZE = 10000; + +template +struct test_traits {}; + +template <> +struct test_traits<::arrow::BooleanType> { + static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static uint8_t const value; +}; + +const uint8_t test_traits<::arrow::BooleanType>::value(1); + +template <> +struct test_traits<::arrow::UInt8Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_8; + static uint8_t const value; +}; + +const uint8_t test_traits<::arrow::UInt8Type>::value(64); + +template <> +struct test_traits<::arrow::Int8Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::INT_8; + static int8_t const value; +}; + +const int8_t test_traits<::arrow::Int8Type>::value(-64); + +template <> +struct test_traits<::arrow::UInt16Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_16; + static uint16_t const value; +}; + +const uint16_t test_traits<::arrow::UInt16Type>::value(1024); + +template <> +struct test_traits<::arrow::Int16Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::INT_16; + static int16_t const value; +}; + +const int16_t test_traits<::arrow::Int16Type>::value(-1024); + +template <> +struct test_traits<::arrow::UInt32Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_32; + static uint32_t const value; +}; + +const uint32_t test_traits<::arrow::UInt32Type>::value(1024); + +template <> +struct test_traits<::arrow::Int32Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static int32_t const value; +}; + +const int32_t test_traits<::arrow::Int32Type>::value(-1024); + +template <> +struct test_traits<::arrow::UInt64Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT64; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_64; + static uint64_t const value; +}; + +const uint64_t test_traits<::arrow::UInt64Type>::value(1024); + +template <> +struct test_traits<::arrow::Int64Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT64; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static int64_t const value; +}; + +const int64_t test_traits<::arrow::Int64Type>::value(-1024); + +template <> +struct test_traits<::arrow::TimestampType> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT64; + static constexpr LogicalType::type logical_enum = LogicalType::TIMESTAMP_MILLIS; + static int64_t const value; +}; + +const int64_t test_traits<::arrow::TimestampType>::value(14695634030000); + +template <> +struct test_traits<::arrow::FloatType> { + static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static float const value; +}; + +const float test_traits<::arrow::FloatType>::value(2.1f); + +template <> +struct test_traits<::arrow::DoubleType> { + static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static double const value; +}; + +const double test_traits<::arrow::DoubleType>::value(4.2); + +template <> +struct test_traits<::arrow::StringType> { + static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; + static constexpr LogicalType::type logical_enum = LogicalType::UTF8; + static std::string const value; +}; + +const std::string test_traits<::arrow::StringType>::value("Test"); + +template +using ParquetDataType = DataType::parquet_enum>; + +template +using ParquetWriter = TypedColumnWriter>; + +template +class TestParquetIO : public ::testing::Test { + public: + virtual void SetUp() {} + + std::shared_ptr MakeSchema(Repetition::type repetition) { + auto pnode = PrimitiveNode::Make("column1", repetition, + test_traits::parquet_enum, test_traits::logical_enum); + NodePtr node_ = + GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); + return std::static_pointer_cast(node_); + } + + std::unique_ptr MakeWriter( + const std::shared_ptr& schema) { + sink_ = std::make_shared(); + return ParquetFileWriter::Open(sink_, schema); + } + + std::unique_ptr ReaderFromSink() { + std::shared_ptr buffer = sink_->GetBuffer(); + std::unique_ptr source(new BufferReader(buffer)); + return ParquetFileReader::Open(std::move(source)); + } + + void ReadSingleColumnFile( + std::unique_ptr file_reader, std::shared_ptr* out) { + FileReader reader(::arrow::default_memory_pool(), std::move(file_reader)); + std::unique_ptr column_reader; + ASSERT_OK_NO_THROW(reader.GetFlatColumn(0, &column_reader)); + ASSERT_NE(nullptr, column_reader.get()); + + ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out)); + ASSERT_NE(nullptr, out->get()); + } + + void ReadAndCheckSingleColumnFile(::arrow::Array* values) { + std::shared_ptr<::arrow::Array> out; + ReadSingleColumnFile(ReaderFromSink(), &out); + ASSERT_TRUE(values->Equals(out)); + } + + void ReadTableFromFile( + std::unique_ptr file_reader, std::shared_ptr* out) { + FileReader reader(::arrow::default_memory_pool(), std::move(file_reader)); + ASSERT_OK_NO_THROW(reader.ReadFlatTable(out)); + ASSERT_NE(nullptr, out->get()); + } + + void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) { + std::shared_ptr<::arrow::Table> out; + ReadTableFromFile(ReaderFromSink(), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(values->length(), out->num_rows()); + + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); + } + + template + void WriteFlatColumn(const std::shared_ptr& schema, + const std::shared_ptr& values) { + FileWriter writer(::arrow::default_memory_pool(), MakeWriter(schema)); + ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length())); + ASSERT_OK_NO_THROW(writer.WriteFlatColumnChunk(values.get())); + ASSERT_OK_NO_THROW(writer.Close()); + } + + std::shared_ptr sink_; +}; + +// We habe separate tests for UInt32Type as this is currently the only type +// where a roundtrip does not yield the identical Array structure. +// There we write an UInt32 Array but receive an Int64 Array as result for +// Parquet version 1.0. + +typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, + ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, + ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType, + ::arrow::StringType> TestTypes; + +TYPED_TEST_CASE(TestParquetIO, TestTypes); + +TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) { + auto values = NonNullArray(SMALL_SIZE); + + std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); + this->WriteFlatColumn(schema, values); + + this->ReadAndCheckSingleColumnFile(values.get()); +} + +TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { + auto values = NonNullArray(SMALL_SIZE); + std::shared_ptr
table = MakeSimpleTable(values, false); + this->sink_ = std::make_shared(); + ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(), + this->sink_, values->length(), default_writer_properties())); + + std::shared_ptr
out; + this->ReadTableFromFile(this->ReaderFromSink(), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(100, out->num_rows()); + + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); +} + +TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { + // This also tests max_definition_level = 1 + auto values = NullableArray(SMALL_SIZE, 10); + + std::shared_ptr schema = this->MakeSchema(Repetition::OPTIONAL); + this->WriteFlatColumn(schema, values); + + this->ReadAndCheckSingleColumnFile(values.get()); +} + +TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { + // This also tests max_definition_level = 1 + std::shared_ptr values = NullableArray(SMALL_SIZE, 10); + std::shared_ptr
table = MakeSimpleTable(values, true); + this->sink_ = std::make_shared(); + ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(), + this->sink_, values->length(), default_writer_properties())); + + this->ReadAndCheckSingleColumnTable(values); +} + +TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { + auto values = NonNullArray(SMALL_SIZE); + int64_t chunk_size = values->length() / 4; + + std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); + FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); + for (int i = 0; i < 4; i++) { + ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); + ASSERT_OK_NO_THROW( + writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size)); + } + ASSERT_OK_NO_THROW(writer.Close()); + + this->ReadAndCheckSingleColumnFile(values.get()); +} + +TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) { + auto values = NonNullArray(LARGE_SIZE); + std::shared_ptr
table = MakeSimpleTable(values, false); + this->sink_ = std::make_shared(); + ASSERT_OK_NO_THROW(WriteFlatTable( + table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties())); + + this->ReadAndCheckSingleColumnTable(values); +} + +TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { + int64_t chunk_size = SMALL_SIZE / 4; + auto values = NullableArray(SMALL_SIZE, 10); + + std::shared_ptr schema = this->MakeSchema(Repetition::OPTIONAL); + FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema)); + for (int i = 0; i < 4; i++) { + ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); + ASSERT_OK_NO_THROW( + writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size)); + } + ASSERT_OK_NO_THROW(writer.Close()); + + this->ReadAndCheckSingleColumnFile(values.get()); +} + +TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { + // This also tests max_definition_level = 1 + auto values = NullableArray(LARGE_SIZE, 100); + std::shared_ptr
table = MakeSimpleTable(values, true); + this->sink_ = std::make_shared(); + ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(), + this->sink_, 512, default_writer_properties())); + + this->ReadAndCheckSingleColumnTable(values); +} + +using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>; + +TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) { + // This also tests max_definition_level = 1 + std::shared_ptr values = + NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100); + std::shared_ptr
table = MakeSimpleTable(values, true); + + // Parquet 2.0 roundtrip should yield an uint32_t column again + this->sink_ = std::make_shared(); + std::shared_ptr<::parquet::WriterProperties> properties = + ::parquet::WriterProperties::Builder() + .version(ParquetVersion::PARQUET_2_0) + ->build(); + ASSERT_OK_NO_THROW( + WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties)); + this->ReadAndCheckSingleColumnTable(values); +} + +TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) { + // This also tests max_definition_level = 1 + std::shared_ptr values = + NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100); + std::shared_ptr
table = MakeSimpleTable(values, true); + + // Parquet 1.0 returns an int64_t column as there is no way to tell a Parquet 1.0 + // reader that a column is unsigned. + this->sink_ = std::make_shared(); + std::shared_ptr<::parquet::WriterProperties> properties = + ::parquet::WriterProperties::Builder() + .version(ParquetVersion::PARQUET_1_0) + ->build(); + ASSERT_OK_NO_THROW(WriteFlatTable( + table.get(), ::arrow::default_memory_pool(), this->sink_, 512, properties)); + + std::shared_ptr expected_values; + std::shared_ptr int64_data = + std::make_shared(::arrow::default_memory_pool()); + { + ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length())); + int64_t* int64_data_ptr = reinterpret_cast(int64_data->mutable_data()); + const uint32_t* uint32_data_ptr = + reinterpret_cast(values->data()->data()); + // std::copy might be faster but this is explicit on the casts) + for (int64_t i = 0; i < values->length(); i++) { + int64_data_ptr[i] = static_cast(uint32_data_ptr[i]); + } + } + ASSERT_OK(MakePrimitiveArray(std::make_shared<::arrow::Int64Type>(), values->length(), + int64_data, values->null_count(), values->null_bitmap(), &expected_values)); + this->ReadAndCheckSingleColumnTable(expected_values); +} + +template +using ParquetCDataType = typename ParquetDataType::c_type; + +template +class TestPrimitiveParquetIO : public TestParquetIO { + public: + typedef typename TestType::c_type T; + + void MakeTestFile(std::vector& values, int num_chunks, + std::unique_ptr* file_reader) { + std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); + std::unique_ptr file_writer = this->MakeWriter(schema); + size_t chunk_size = values.size() / num_chunks; + // Convert to Parquet's expected physical type + std::vector values_buffer( + sizeof(ParquetCDataType) * values.size()); + auto values_parquet = + reinterpret_cast*>(values_buffer.data()); + std::copy(values.cbegin(), values.cend(), values_parquet); + for (int i = 0; i < num_chunks; i++) { + auto row_group_writer = file_writer->AppendRowGroup(chunk_size); + auto column_writer = + static_cast*>(row_group_writer->NextColumn()); + ParquetCDataType* data = values_parquet + i * chunk_size; + column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); + column_writer->Close(); + row_group_writer->Close(); + } + file_writer->Close(); + *file_reader = this->ReaderFromSink(); + } + + void CheckSingleColumnRequiredTableRead(int num_chunks) { + std::vector values(SMALL_SIZE, test_traits::value); + std::unique_ptr file_reader; + ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader)); + + std::shared_ptr
out; + this->ReadTableFromFile(std::move(file_reader), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(SMALL_SIZE, out->num_rows()); + + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ExpectArray(values.data(), chunked_array->chunk(0).get()); + } + + void CheckSingleColumnRequiredRead(int num_chunks) { + std::vector values(SMALL_SIZE, test_traits::value); + std::unique_ptr file_reader; + ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader)); + + std::shared_ptr out; + this->ReadSingleColumnFile(std::move(file_reader), &out); + + ExpectArray(values.data(), out.get()); + } +}; + +typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, + ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::UInt32Type, ::arrow::Int32Type, + ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType, + ::arrow::DoubleType> PrimitiveTestTypes; + +TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes); + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredRead) { + this->CheckSingleColumnRequiredRead(1); +} + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredTableRead) { + this->CheckSingleColumnRequiredTableRead(1); +} + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedRead) { + this->CheckSingleColumnRequiredRead(4); +} + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) { + this->CheckSingleColumnRequiredTableRead(4); +} + +} // namespace arrow + +} // namespace parquet diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc new file mode 100644 index 00000000..3dfaf149 --- /dev/null +++ b/src/parquet/arrow/arrow-schema-test.cc @@ -0,0 +1,265 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "gtest/gtest.h" + +#include "parquet/arrow/schema.h" + +#include "arrow/test-util.h" +#include "arrow/type.h" +#include "arrow/types/datetime.h" +#include "arrow/types/decimal.h" +#include "arrow/util/status.h" + +using arrow::Field; + +using ParquetType = parquet::Type; +using parquet::LogicalType; +using parquet::Repetition; +using parquet::schema::NodePtr; +using parquet::schema::GroupNode; +using parquet::schema::PrimitiveNode; + +namespace parquet { + +namespace arrow { + +const auto BOOL = std::make_shared<::arrow::BooleanType>(); +const auto UINT8 = std::make_shared<::arrow::UInt8Type>(); +const auto INT32 = std::make_shared<::arrow::Int32Type>(); +const auto INT64 = std::make_shared<::arrow::Int64Type>(); +const auto FLOAT = std::make_shared<::arrow::FloatType>(); +const auto DOUBLE = std::make_shared<::arrow::DoubleType>(); +const auto UTF8 = std::make_shared<::arrow::StringType>(); +const auto TIMESTAMP_MS = + std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI); +// TODO: This requires parquet-cpp implementing the MICROS enum value +// const auto TIMESTAMP_US = std::make_shared(TimestampType::Unit::MICRO); +const auto BINARY = + std::make_shared<::arrow::ListType>(std::make_shared("", UINT8)); +const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4); + +class TestConvertParquetSchema : public ::testing::Test { + public: + virtual void SetUp() {} + + void CheckFlatSchema(const std::shared_ptr<::arrow::Schema>& expected_schema) { + ASSERT_EQ(expected_schema->num_fields(), result_schema_->num_fields()); + for (int i = 0; i < expected_schema->num_fields(); ++i) { + auto lhs = result_schema_->field(i); + auto rhs = expected_schema->field(i); + EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString() + << " != " << rhs->ToString(); + } + } + + ::arrow::Status ConvertSchema(const std::vector& nodes) { + NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes); + descr_.Init(schema); + return FromParquetSchema(&descr_, &result_schema_); + } + + protected: + SchemaDescriptor descr_; + std::shared_ptr<::arrow::Schema> result_schema_; +}; + +TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) { + std::vector parquet_fields; + std::vector> arrow_fields; + + parquet_fields.push_back( + PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN)); + arrow_fields.push_back(std::make_shared("boolean", BOOL, false)); + + parquet_fields.push_back( + PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32)); + arrow_fields.push_back(std::make_shared("int32", INT32, false)); + + parquet_fields.push_back( + PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64)); + arrow_fields.push_back(std::make_shared("int64", INT64, false)); + + parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, + ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS)); + arrow_fields.push_back(std::make_shared("timestamp", TIMESTAMP_MS, false)); + + // parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, + // ParquetType::INT64, LogicalType::TIMESTAMP_MICROS)); + // arrow_fields.push_back(std::make_shared("timestamp", TIMESTAMP_US, false)); + + parquet_fields.push_back( + PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT)); + arrow_fields.push_back(std::make_shared("float", FLOAT)); + + parquet_fields.push_back( + PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE)); + arrow_fields.push_back(std::make_shared("double", DOUBLE)); + + parquet_fields.push_back( + PrimitiveNode::Make("binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY)); + arrow_fields.push_back(std::make_shared("binary", BINARY)); + + parquet_fields.push_back(PrimitiveNode::Make( + "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8)); + arrow_fields.push_back(std::make_shared("string", UTF8)); + + parquet_fields.push_back(PrimitiveNode::Make("flba-binary", Repetition::OPTIONAL, + ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE, 12)); + arrow_fields.push_back(std::make_shared("flba-binary", BINARY)); + + auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); + ASSERT_OK(ConvertSchema(parquet_fields)); + + CheckFlatSchema(arrow_schema); +} + +TEST_F(TestConvertParquetSchema, ParquetFlatDecimals) { + std::vector parquet_fields; + std::vector> arrow_fields; + + parquet_fields.push_back(PrimitiveNode::Make("flba-decimal", Repetition::OPTIONAL, + ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 4, 8, 4)); + arrow_fields.push_back(std::make_shared("flba-decimal", DECIMAL_8_4)); + + parquet_fields.push_back(PrimitiveNode::Make("binary-decimal", Repetition::OPTIONAL, + ParquetType::BYTE_ARRAY, LogicalType::DECIMAL, -1, 8, 4)); + arrow_fields.push_back(std::make_shared("binary-decimal", DECIMAL_8_4)); + + parquet_fields.push_back(PrimitiveNode::Make("int32-decimal", Repetition::OPTIONAL, + ParquetType::INT32, LogicalType::DECIMAL, -1, 8, 4)); + arrow_fields.push_back(std::make_shared("int32-decimal", DECIMAL_8_4)); + + parquet_fields.push_back(PrimitiveNode::Make("int64-decimal", Repetition::OPTIONAL, + ParquetType::INT64, LogicalType::DECIMAL, -1, 8, 4)); + arrow_fields.push_back(std::make_shared("int64-decimal", DECIMAL_8_4)); + + auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); + ASSERT_OK(ConvertSchema(parquet_fields)); + + CheckFlatSchema(arrow_schema); +} + +TEST_F(TestConvertParquetSchema, UnsupportedThings) { + std::vector unsupported_nodes; + + unsupported_nodes.push_back( + PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96)); + + unsupported_nodes.push_back( + GroupNode::Make("repeated-group", Repetition::REPEATED, {})); + + unsupported_nodes.push_back(PrimitiveNode::Make( + "int32", Repetition::OPTIONAL, ParquetType::INT32, LogicalType::DATE)); + + for (const NodePtr& node : unsupported_nodes) { + ASSERT_RAISES(NotImplemented, ConvertSchema({node})); + } +} + +class TestConvertArrowSchema : public ::testing::Test { + public: + virtual void SetUp() {} + + void CheckFlatSchema(const std::vector& nodes) { + NodePtr schema_node = GroupNode::Make("schema", Repetition::REPEATED, nodes); + const GroupNode* expected_schema_node = + static_cast(schema_node.get()); + const GroupNode* result_schema_node = result_schema_->group_node(); + + ASSERT_EQ(expected_schema_node->field_count(), result_schema_node->field_count()); + + for (int i = 0; i < expected_schema_node->field_count(); i++) { + auto lhs = result_schema_node->field(i); + auto rhs = expected_schema_node->field(i); + EXPECT_TRUE(lhs->Equals(rhs.get())); + } + } + + ::arrow::Status ConvertSchema(const std::vector>& fields) { + arrow_schema_ = std::make_shared<::arrow::Schema>(fields); + std::shared_ptr<::parquet::WriterProperties> properties = + ::parquet::default_writer_properties(); + return ToParquetSchema(arrow_schema_.get(), *properties.get(), &result_schema_); + } + + protected: + std::shared_ptr<::arrow::Schema> arrow_schema_; + std::shared_ptr result_schema_; +}; + +TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) { + std::vector parquet_fields; + std::vector> arrow_fields; + + parquet_fields.push_back( + PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN)); + arrow_fields.push_back(std::make_shared("boolean", BOOL, false)); + + parquet_fields.push_back( + PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32)); + arrow_fields.push_back(std::make_shared("int32", INT32, false)); + + parquet_fields.push_back( + PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64)); + arrow_fields.push_back(std::make_shared("int64", INT64, false)); + + parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, + ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS)); + arrow_fields.push_back(std::make_shared("timestamp", TIMESTAMP_MS, false)); + + // parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, + // ParquetType::INT64, LogicalType::TIMESTAMP_MICROS)); + // arrow_fields.push_back(std::make_shared("timestamp", TIMESTAMP_US, false)); + + parquet_fields.push_back( + PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT)); + arrow_fields.push_back(std::make_shared("float", FLOAT)); + + parquet_fields.push_back( + PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE)); + arrow_fields.push_back(std::make_shared("double", DOUBLE)); + + // TODO: String types need to be clarified a bit more in the Arrow spec + parquet_fields.push_back(PrimitiveNode::Make( + "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8)); + arrow_fields.push_back(std::make_shared("string", UTF8)); + + ASSERT_OK(ConvertSchema(arrow_fields)); + + CheckFlatSchema(parquet_fields); +} + +TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) { + std::vector parquet_fields; + std::vector> arrow_fields; + + // TODO: Test Decimal Arrow -> Parquet conversion + + ASSERT_OK(ConvertSchema(arrow_fields)); + + CheckFlatSchema(parquet_fields); +} + +TEST(TestNodeConversion, DateAndTime) {} + +} // namespace arrow + +} // namespace parquet diff --git a/src/parquet/arrow/io.cc b/src/parquet/arrow/io.cc new file mode 100644 index 00000000..8e2645ab --- /dev/null +++ b/src/parquet/arrow/io.cc @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/arrow/io.h" + +#include +#include + +#include "parquet/api/io.h" +#include "parquet/arrow/utils.h" + +#include "arrow/util/status.h" + +using arrow::Status; +using arrow::MemoryPool; + +// To assist with readability +using ArrowROFile = arrow::io::RandomAccessFile; + +namespace parquet { +namespace arrow { + +// ---------------------------------------------------------------------- +// ParquetAllocator + +ParquetAllocator::ParquetAllocator() : pool_(::arrow::default_memory_pool()) {} + +ParquetAllocator::ParquetAllocator(MemoryPool* pool) : pool_(pool) {} + +ParquetAllocator::~ParquetAllocator() {} + +uint8_t* ParquetAllocator::Malloc(int64_t size) { + uint8_t* result; + PARQUET_THROW_NOT_OK(pool_->Allocate(size, &result)); + return result; +} + +void ParquetAllocator::Free(uint8_t* buffer, int64_t size) { + // Does not report Status + pool_->Free(buffer, size); +} + +// ---------------------------------------------------------------------- +// ParquetReadSource + +ParquetReadSource::ParquetReadSource(ParquetAllocator* allocator) + : file_(nullptr), allocator_(allocator) {} + +Status ParquetReadSource::Open(const std::shared_ptr& file) { + int64_t file_size; + RETURN_NOT_OK(file->GetSize(&file_size)); + + file_ = file; + size_ = file_size; + return Status::OK(); +} + +void ParquetReadSource::Close() { + // TODO(wesm): Make this a no-op for now. This leaves Python wrappers for + // these classes in a borked state. Probably better to explicitly close. + + // PARQUET_THROW_NOT_OK(file_->Close()); +} + +int64_t ParquetReadSource::Tell() const { + int64_t position; + PARQUET_THROW_NOT_OK(file_->Tell(&position)); + return position; +} + +void ParquetReadSource::Seek(int64_t position) { + PARQUET_THROW_NOT_OK(file_->Seek(position)); +} + +int64_t ParquetReadSource::Read(int64_t nbytes, uint8_t* out) { + int64_t bytes_read; + PARQUET_THROW_NOT_OK(file_->Read(nbytes, &bytes_read, out)); + return bytes_read; +} + +std::shared_ptr ParquetReadSource::Read(int64_t nbytes) { + // TODO(wesm): This code is duplicated from parquet/util/input.cc; suggests + // that there should be more code sharing amongst file-like sources + auto result = std::make_shared(0, allocator_); + result->Resize(nbytes); + + int64_t bytes_read = Read(nbytes, result->mutable_data()); + if (bytes_read < nbytes) { result->Resize(bytes_read); } + return result; +} + +} // namespace arrow +} // namespace parquet diff --git a/src/parquet/arrow/io.h b/src/parquet/arrow/io.h new file mode 100644 index 00000000..dc606358 --- /dev/null +++ b/src/parquet/arrow/io.h @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Bridges Arrow's IO interfaces and Parquet-cpp's IO interfaces + +#ifndef PARQUET_ARROW_IO_H +#define PARQUET_ARROW_IO_H + +#include +#include + +#include "parquet/api/io.h" + +#include "arrow/io/interfaces.h" +#include "arrow/util/memory-pool.h" + +namespace parquet { + +namespace arrow { + +// An implementation of the Parquet MemoryAllocator API that plugs into an +// existing Arrow memory pool. This way we can direct all allocations to a +// single place rather than tracking allocations in different locations (for +// example: without utilizing parquet-cpp's default allocator) +class PARQUET_EXPORT ParquetAllocator : public MemoryAllocator { + public: + // Uses the default memory pool + ParquetAllocator(); + + explicit ParquetAllocator(::arrow::MemoryPool* pool); + virtual ~ParquetAllocator(); + + uint8_t* Malloc(int64_t size) override; + void Free(uint8_t* buffer, int64_t size) override; + + void set_pool(::arrow::MemoryPool* pool) { pool_ = pool; } + + ::arrow::MemoryPool* pool() const { return pool_; } + + private: + ::arrow::MemoryPool* pool_; +}; + +class PARQUET_EXPORT ParquetReadSource : public RandomAccessSource { + public: + explicit ParquetReadSource(ParquetAllocator* allocator); + + // We need to ask for the file size on opening the file, and this can fail + ::arrow::Status Open(const std::shared_ptr<::arrow::io::RandomAccessFile>& file); + + void Close() override; + int64_t Tell() const override; + void Seek(int64_t pos) override; + int64_t Read(int64_t nbytes, uint8_t* out) override; + std::shared_ptr Read(int64_t nbytes) override; + + private: + // An Arrow readable file of some kind + std::shared_ptr<::arrow::io::RandomAccessFile> file_; + + // The allocator is required for creating managed buffers + ParquetAllocator* allocator_; +}; + +} // namespace arrow +} // namespace parquet + +#endif // PARQUET_ARROW_IO_H diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc new file mode 100644 index 00000000..056b5abd --- /dev/null +++ b/src/parquet/arrow/reader.cc @@ -0,0 +1,406 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/arrow/reader.h" + +#include +#include +#include +#include + +#include "parquet/arrow/io.h" +#include "parquet/arrow/schema.h" +#include "parquet/arrow/utils.h" + +#include "arrow/column.h" +#include "arrow/schema.h" +#include "arrow/table.h" +#include "arrow/types/primitive.h" +#include "arrow/types/string.h" +#include "arrow/util/status.h" + +using arrow::Array; +using arrow::Column; +using arrow::Field; +using arrow::MemoryPool; +using arrow::PoolBuffer; +using arrow::Status; +using arrow::Table; + +// Help reduce verbosity +using ParquetRAS = parquet::RandomAccessSource; +using ParquetReader = parquet::ParquetFileReader; + +namespace parquet { +namespace arrow { + +template +struct ArrowTypeTraits { + typedef ::arrow::NumericBuilder builder_type; +}; + +template <> +struct ArrowTypeTraits { + typedef ::arrow::BooleanBuilder builder_type; +}; + +template +using BuilderType = typename ArrowTypeTraits::builder_type; + +class FileReader::Impl { + public: + Impl(MemoryPool* pool, std::unique_ptr reader); + virtual ~Impl() {} + + bool CheckForFlatColumn(const ColumnDescriptor* descr); + Status GetFlatColumn(int i, std::unique_ptr* out); + Status ReadFlatColumn(int i, std::shared_ptr* out); + Status ReadFlatTable(std::shared_ptr
* out); + + private: + MemoryPool* pool_; + std::unique_ptr reader_; +}; + +class FlatColumnReader::Impl { + public: + Impl(MemoryPool* pool, const ColumnDescriptor* descr, + ParquetFileReader* reader, int column_index); + virtual ~Impl() {} + + Status NextBatch(int batch_size, std::shared_ptr* out); + template + Status TypedReadBatch(int batch_size, std::shared_ptr* out); + + template + Status ReadNullableFlatBatch(const int16_t* def_levels, + typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read, + BuilderType* builder); + template + Status ReadNonNullableBatch(typename ParquetType::c_type* values, int64_t values_read, + BuilderType* builder); + + private: + void NextRowGroup(); + + template + struct can_copy_ptr { + static constexpr bool value = + std::is_same::value || + (std::is_integral{} && std::is_integral{} && + (sizeof(InType) == sizeof(OutType))); + }; + + template ::value>::type* = nullptr> + Status ConvertPhysicalType( + const InType* in_ptr, int64_t length, const OutType** out_ptr) { + *out_ptr = reinterpret_cast(in_ptr); + return Status::OK(); + } + + template ::value>::type* = nullptr> + Status ConvertPhysicalType( + const InType* in_ptr, int64_t length, const OutType** out_ptr) { + RETURN_NOT_OK(values_builder_buffer_.Resize(length * sizeof(OutType))); + OutType* mutable_out_ptr = + reinterpret_cast(values_builder_buffer_.mutable_data()); + std::copy(in_ptr, in_ptr + length, mutable_out_ptr); + *out_ptr = mutable_out_ptr; + return Status::OK(); + } + + MemoryPool* pool_; + const ColumnDescriptor* descr_; + ParquetFileReader* reader_; + int column_index_; + int next_row_group_; + std::shared_ptr column_reader_; + std::shared_ptr field_; + + PoolBuffer values_buffer_; + PoolBuffer def_levels_buffer_; + PoolBuffer values_builder_buffer_; + PoolBuffer valid_bytes_buffer_; +}; + +FileReader::Impl::Impl( + MemoryPool* pool, std::unique_ptr reader) + : pool_(pool), reader_(std::move(reader)) {} + +bool FileReader::Impl::CheckForFlatColumn(const ColumnDescriptor* descr) { + if ((descr->max_repetition_level() > 0) || (descr->max_definition_level() > 1)) { + return false; + } else if ((descr->max_definition_level() == 1) && + (descr->schema_node()->repetition() != Repetition::OPTIONAL)) { + return false; + } + return true; +} + +Status FileReader::Impl::GetFlatColumn(int i, std::unique_ptr* out) { + const SchemaDescriptor* schema = reader_->metadata()->schema(); + + if (!CheckForFlatColumn(schema->Column(i))) { + return Status::Invalid("The requested column is not flat"); + } + std::unique_ptr impl( + new FlatColumnReader::Impl(pool_, schema->Column(i), reader_.get(), i)); + *out = std::unique_ptr(new FlatColumnReader(std::move(impl))); + return Status::OK(); +} + +Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr* out) { + std::unique_ptr flat_column_reader; + RETURN_NOT_OK(GetFlatColumn(i, &flat_column_reader)); + return flat_column_reader->NextBatch(reader_->metadata()->num_rows(), out); +} + +Status FileReader::Impl::ReadFlatTable(std::shared_ptr
* table) { + auto descr = reader_->metadata()->schema(); + + const std::string& name = descr->name(); + std::shared_ptr<::arrow::Schema> schema; + RETURN_NOT_OK(FromParquetSchema(descr, &schema)); + + int num_columns = reader_->metadata()->num_columns(); + + std::vector> columns(num_columns); + for (int i = 0; i < num_columns; i++) { + std::shared_ptr array; + RETURN_NOT_OK(ReadFlatColumn(i, &array)); + columns[i] = std::make_shared(schema->field(i), array); + } + + *table = std::make_shared
(name, schema, columns); + return Status::OK(); +} + +FileReader::FileReader( + MemoryPool* pool, std::unique_ptr reader) + : impl_(new FileReader::Impl(pool, std::move(reader))) {} + +FileReader::~FileReader() {} + +// Static ctor +Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, + ParquetAllocator* allocator, std::unique_ptr* reader) { + std::unique_ptr source(new ParquetReadSource(allocator)); + RETURN_NOT_OK(source->Open(file)); + + // TODO(wesm): reader properties + std::unique_ptr pq_reader; + PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(std::move(source))); + + // Use the same memory pool as the ParquetAllocator + reader->reset(new FileReader(allocator->pool(), std::move(pq_reader))); + return Status::OK(); +} + +Status FileReader::GetFlatColumn(int i, std::unique_ptr* out) { + return impl_->GetFlatColumn(i, out); +} + +Status FileReader::ReadFlatColumn(int i, std::shared_ptr* out) { + return impl_->ReadFlatColumn(i, out); +} + +Status FileReader::ReadFlatTable(std::shared_ptr
* out) { + return impl_->ReadFlatTable(out); +} + +FlatColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor* descr, + ParquetFileReader* reader, int column_index) + : pool_(pool), + descr_(descr), + reader_(reader), + column_index_(column_index), + next_row_group_(0), + values_buffer_(pool), + def_levels_buffer_(pool) { + NodeToField(descr_->schema_node(), &field_); + NextRowGroup(); +} + +template +Status FlatColumnReader::Impl::ReadNonNullableBatch(typename ParquetType::c_type* values, + int64_t values_read, BuilderType* builder) { + using ArrowCType = typename ArrowType::c_type; + using ParquetCType = typename ParquetType::c_type; + + DCHECK(builder); + const ArrowCType* values_ptr = nullptr; + RETURN_NOT_OK( + (ConvertPhysicalType(values, values_read, &values_ptr))); + RETURN_NOT_OK(builder->Append(values_ptr, values_read)); + return Status::OK(); +} + +template +Status FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels, + typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read, + BuilderType* builder) { + using ArrowCType = typename ArrowType::c_type; + + DCHECK(builder); + RETURN_NOT_OK(values_builder_buffer_.Resize(levels_read * sizeof(ArrowCType))); + RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t))); + auto values_ptr = reinterpret_cast(values_builder_buffer_.mutable_data()); + uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data(); + int values_idx = 0; + for (int64_t i = 0; i < levels_read; i++) { + if (def_levels[i] < descr_->max_definition_level()) { + valid_bytes[i] = 0; + } else { + valid_bytes[i] = 1; + values_ptr[i] = values[values_idx++]; + } + } + RETURN_NOT_OK(builder->Append(values_ptr, levels_read, valid_bytes)); + return Status::OK(); +} + +template +Status FlatColumnReader::Impl::TypedReadBatch( + int batch_size, std::shared_ptr* out) { + using ParquetCType = typename ParquetType::c_type; + + int values_to_read = batch_size; + BuilderType builder(pool_, field_->type); + while ((values_to_read > 0) && column_reader_) { + values_buffer_.Resize(values_to_read * sizeof(ParquetCType)); + if (descr_->max_definition_level() > 0) { + def_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); + } + auto reader = dynamic_cast*>(column_reader_.get()); + int64_t values_read; + int64_t levels_read; + int16_t* def_levels = reinterpret_cast(def_levels_buffer_.mutable_data()); + auto values = reinterpret_cast(values_buffer_.mutable_data()); + PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch( + values_to_read, def_levels, nullptr, values, &values_read)); + values_to_read -= levels_read; + if (descr_->max_definition_level() == 0) { + RETURN_NOT_OK( + (ReadNonNullableBatch(values, values_read, &builder))); + } else { + // As per the defintion and checks for flat columns: + // descr_->max_definition_level() == 1 + RETURN_NOT_OK((ReadNullableFlatBatch( + def_levels, values, values_read, levels_read, &builder))); + } + if (!column_reader_->HasNext()) { NextRowGroup(); } + } + *out = builder.Finish(); + return Status::OK(); +} + +template <> +Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>( + int batch_size, std::shared_ptr* out) { + int values_to_read = batch_size; + ::arrow::StringBuilder builder(pool_, field_->type); + while ((values_to_read > 0) && column_reader_) { + values_buffer_.Resize(values_to_read * sizeof(ByteArray)); + if (descr_->max_definition_level() > 0) { + def_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); + } + auto reader = + dynamic_cast*>(column_reader_.get()); + int64_t values_read; + int64_t levels_read; + int16_t* def_levels = reinterpret_cast(def_levels_buffer_.mutable_data()); + auto values = reinterpret_cast(values_buffer_.mutable_data()); + PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch( + values_to_read, def_levels, nullptr, values, &values_read)); + values_to_read -= levels_read; + if (descr_->max_definition_level() == 0) { + for (int64_t i = 0; i < levels_read; i++) { + RETURN_NOT_OK( + builder.Append(reinterpret_cast(values[i].ptr), values[i].len)); + } + } else { + // descr_->max_definition_level() == 1 + int values_idx = 0; + for (int64_t i = 0; i < levels_read; i++) { + if (def_levels[i] < descr_->max_definition_level()) { + RETURN_NOT_OK(builder.AppendNull()); + } else { + RETURN_NOT_OK( + builder.Append(reinterpret_cast(values[values_idx].ptr), + values[values_idx].len)); + values_idx++; + } + } + } + if (!column_reader_->HasNext()) { NextRowGroup(); } + } + *out = builder.Finish(); + return Status::OK(); +} + +#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \ + case ::arrow::Type::ENUM: \ + return TypedReadBatch(batch_size, out); \ + break; + +Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out) { + if (!column_reader_) { + // Exhausted all row groups. + *out = nullptr; + return Status::OK(); + } + + switch (field_->type->type) { + TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType) + TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type) + TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type) + TYPED_BATCH_CASE(UINT16, ::arrow::UInt16Type, Int32Type) + TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type) + TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type) + TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type) + TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type) + TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type) + TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType) + TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType) + TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType) + TYPED_BATCH_CASE(TIMESTAMP, ::arrow::TimestampType, Int64Type) + default: + return Status::NotImplemented(field_->type->ToString()); + } +} + +void FlatColumnReader::Impl::NextRowGroup() { + if (next_row_group_ < reader_->metadata()->num_row_groups()) { + column_reader_ = reader_->RowGroup(next_row_group_)->Column(column_index_); + next_row_group_++; + } else { + column_reader_ = nullptr; + } +} + +FlatColumnReader::FlatColumnReader(std::unique_ptr impl) : impl_(std::move(impl)) {} + +FlatColumnReader::~FlatColumnReader() {} + +Status FlatColumnReader::NextBatch(int batch_size, std::shared_ptr* out) { + return impl_->NextBatch(batch_size, out); +} + +} // namespace arrow +} // namespace parquet diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h new file mode 100644 index 00000000..e7257285 --- /dev/null +++ b/src/parquet/arrow/reader.h @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_ARROW_READER_H +#define PARQUET_ARROW_READER_H + +#include + +#include "parquet/api/reader.h" +#include "parquet/api/schema.h" +#include "parquet/arrow/io.h" + +#include "arrow/io/interfaces.h" + +namespace arrow { + +class Array; +class MemoryPool; +class RowBatch; +class Status; +class Table; +} + +namespace parquet { + +namespace arrow { + +class FlatColumnReader; + +// Arrow read adapter class for deserializing Parquet files as Arrow row +// batches. +// +// TODO(wesm): nested data does not always make sense with this user +// interface unless you are only reading a single leaf node from a branch of +// a table. For example: +// +// repeated group data { +// optional group record { +// optional int32 val1; +// optional byte_array val2; +// optional bool val3; +// } +// optional int32 val4; +// } +// +// In the Parquet file, there are 3 leaf nodes: +// +// * data.record.val1 +// * data.record.val2 +// * data.record.val3 +// * data.val4 +// +// When materializing this data in an Arrow array, we would have: +// +// data: list), +// val3: bool, +// >, +// val4: int32 +// >> +// +// However, in the Parquet format, each leaf node has its own repetition and +// definition levels describing the structure of the intermediate nodes in +// this array structure. Thus, we will need to scan the leaf data for a group +// of leaf nodes part of the same type tree to create a single result Arrow +// nested array structure. +// +// This is additionally complicated "chunky" repeated fields or very large byte +// arrays +class PARQUET_EXPORT FileReader { + public: + FileReader(::arrow::MemoryPool* pool, std::unique_ptr reader); + + // Since the distribution of columns amongst a Parquet file's row groups may + // be uneven (the number of values in each column chunk can be different), we + // provide a column-oriented read interface. The ColumnReader hides the + // details of paging through the file's row groups and yielding + // fully-materialized arrow::Array instances + // + // Returns error status if the column of interest is not flat. + ::arrow::Status GetFlatColumn(int i, std::unique_ptr* out); + // Read column as a whole into an Array. + ::arrow::Status ReadFlatColumn(int i, std::shared_ptr<::arrow::Array>* out); + // Read a table of flat columns into a Table. + ::arrow::Status ReadFlatTable(std::shared_ptr<::arrow::Table>* out); + + virtual ~FileReader(); + + private: + class PARQUET_NO_EXPORT Impl; + std::unique_ptr impl_; +}; + +// At this point, the column reader is a stream iterator. It only knows how to +// read the next batch of values for a particular column from the file until it +// runs out. +// +// We also do not expose any internal Parquet details, such as row groups. This +// might change in the future. +class PARQUET_EXPORT FlatColumnReader { + public: + virtual ~FlatColumnReader(); + + // Scan the next array of the indicated size. The actual size of the + // returned array may be less than the passed size depending how much data is + // available in the file. + // + // When all the data in the file has been exhausted, the result is set to + // nullptr. + // + // Returns Status::OK on a successful read, including if you have exhausted + // the data available in the file. + ::arrow::Status NextBatch(int batch_size, std::shared_ptr<::arrow::Array>* out); + + private: + class PARQUET_NO_EXPORT Impl; + std::unique_ptr impl_; + explicit FlatColumnReader(std::unique_ptr impl); + + friend class FileReader; +}; + +// Helper function to create a file reader from an implementation of an Arrow +// readable file +PARQUET_EXPORT +::arrow::Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, + ParquetAllocator* allocator, std::unique_ptr* reader); + +} // namespace arrow +} // namespace parquet + +#endif // PARQUET_ARROW_READER_H diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc new file mode 100644 index 00000000..5a38a286 --- /dev/null +++ b/src/parquet/arrow/schema.cc @@ -0,0 +1,351 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/arrow/schema.h" + +#include +#include + +#include "parquet/api/schema.h" +#include "parquet/arrow/utils.h" + +#include "arrow/types/decimal.h" +#include "arrow/types/string.h" +#include "arrow/util/status.h" + +using arrow::Field; +using arrow::Status; +using arrow::TypePtr; + +using ArrowType = arrow::Type; + +using parquet::Repetition; +using parquet::schema::Node; +using parquet::schema::NodePtr; +using parquet::schema::GroupNode; +using parquet::schema::PrimitiveNode; + +using ParquetType = parquet::Type; +using parquet::LogicalType; + +namespace parquet { + +namespace arrow { + +const auto BOOL = std::make_shared<::arrow::BooleanType>(); +const auto UINT8 = std::make_shared<::arrow::UInt8Type>(); +const auto INT8 = std::make_shared<::arrow::Int8Type>(); +const auto UINT16 = std::make_shared<::arrow::UInt16Type>(); +const auto INT16 = std::make_shared<::arrow::Int16Type>(); +const auto UINT32 = std::make_shared<::arrow::UInt32Type>(); +const auto INT32 = std::make_shared<::arrow::Int32Type>(); +const auto UINT64 = std::make_shared<::arrow::UInt64Type>(); +const auto INT64 = std::make_shared<::arrow::Int64Type>(); +const auto FLOAT = std::make_shared<::arrow::FloatType>(); +const auto DOUBLE = std::make_shared<::arrow::DoubleType>(); +const auto UTF8 = std::make_shared<::arrow::StringType>(); +const auto TIMESTAMP_MS = + std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI); +const auto BINARY = + std::make_shared<::arrow::ListType>(std::make_shared<::arrow::Field>("", UINT8)); + +TypePtr MakeDecimalType(const PrimitiveNode* node) { + int precision = node->decimal_metadata().precision; + int scale = node->decimal_metadata().scale; + return std::make_shared<::arrow::DecimalType>(precision, scale); +} + +static Status FromByteArray(const PrimitiveNode* node, TypePtr* out) { + switch (node->logical_type()) { + case LogicalType::UTF8: + *out = UTF8; + break; + case LogicalType::DECIMAL: + *out = MakeDecimalType(node); + break; + default: + // BINARY + *out = BINARY; + break; + } + return Status::OK(); +} + +static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) { + switch (node->logical_type()) { + case LogicalType::NONE: + *out = BINARY; + break; + case LogicalType::DECIMAL: + *out = MakeDecimalType(node); + break; + default: + return Status::NotImplemented("unhandled type"); + break; + } + + return Status::OK(); +} + +static Status FromInt32(const PrimitiveNode* node, TypePtr* out) { + switch (node->logical_type()) { + case LogicalType::NONE: + *out = INT32; + break; + case LogicalType::UINT_8: + *out = UINT8; + break; + case LogicalType::INT_8: + *out = INT8; + break; + case LogicalType::UINT_16: + *out = UINT16; + break; + case LogicalType::INT_16: + *out = INT16; + break; + case LogicalType::UINT_32: + *out = UINT32; + break; + case LogicalType::DECIMAL: + *out = MakeDecimalType(node); + break; + default: + return Status::NotImplemented("Unhandled logical type for int32"); + break; + } + return Status::OK(); +} + +static Status FromInt64(const PrimitiveNode* node, TypePtr* out) { + switch (node->logical_type()) { + case LogicalType::NONE: + *out = INT64; + break; + case LogicalType::UINT_64: + *out = UINT64; + break; + case LogicalType::DECIMAL: + *out = MakeDecimalType(node); + break; + case LogicalType::TIMESTAMP_MILLIS: + *out = TIMESTAMP_MS; + break; + default: + return Status::NotImplemented("Unhandled logical type for int64"); + break; + } + return Status::OK(); +} + +// TODO: Logical Type Handling +Status NodeToField(const NodePtr& node, std::shared_ptr* out) { + std::shared_ptr<::arrow::DataType> type; + + if (node->is_repeated()) { + return Status::NotImplemented("No support yet for repeated node types"); + } + + if (node->is_group()) { + const GroupNode* group = static_cast(node.get()); + std::vector> fields(group->field_count()); + for (int i = 0; i < group->field_count(); i++) { + RETURN_NOT_OK(NodeToField(group->field(i), &fields[i])); + } + type = std::make_shared<::arrow::StructType>(fields); + } else { + // Primitive (leaf) node + const PrimitiveNode* primitive = static_cast(node.get()); + + switch (primitive->physical_type()) { + case ParquetType::BOOLEAN: + type = BOOL; + break; + case ParquetType::INT32: + RETURN_NOT_OK(FromInt32(primitive, &type)); + break; + case ParquetType::INT64: + RETURN_NOT_OK(FromInt64(primitive, &type)); + break; + case ParquetType::INT96: + // TODO: Do we have that type in Arrow? + // type = TypePtr(new Int96Type()); + return Status::NotImplemented("int96"); + case ParquetType::FLOAT: + type = FLOAT; + break; + case ParquetType::DOUBLE: + type = DOUBLE; + break; + case ParquetType::BYTE_ARRAY: + // TODO: Do we have that type in Arrow? + RETURN_NOT_OK(FromByteArray(primitive, &type)); + break; + case ParquetType::FIXED_LEN_BYTE_ARRAY: + RETURN_NOT_OK(FromFLBA(primitive, &type)); + break; + } + } + + *out = std::make_shared(node->name(), type, !node->is_required()); + return Status::OK(); +} + +Status FromParquetSchema( + const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out) { + // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes + // from the root Parquet node + const GroupNode* schema_node = parquet_schema->group_node(); + + std::vector> fields(schema_node->field_count()); + for (int i = 0; i < schema_node->field_count(); i++) { + RETURN_NOT_OK(NodeToField(schema_node->field(i), &fields[i])); + } + + *out = std::make_shared<::arrow::Schema>(fields); + return Status::OK(); +} + +Status StructToNode(const std::shared_ptr<::arrow::StructType>& type, + const std::string& name, bool nullable, const WriterProperties& properties, + NodePtr* out) { + Repetition::type repetition = Repetition::REQUIRED; + if (nullable) { repetition = Repetition::OPTIONAL; } + + std::vector children(type->num_children()); + for (int i = 0; i < type->num_children(); i++) { + RETURN_NOT_OK(FieldToNode(type->child(i), properties, &children[i])); + } + + *out = GroupNode::Make(name, repetition, children); + return Status::OK(); +} + +Status FieldToNode(const std::shared_ptr& field, + const WriterProperties& properties, NodePtr* out) { + LogicalType::type logical_type = LogicalType::NONE; + ParquetType::type type; + Repetition::type repetition = Repetition::REQUIRED; + if (field->nullable) { repetition = Repetition::OPTIONAL; } + int length = -1; + + switch (field->type->type) { + // TODO: + // case ArrowType::NA: + // break; + case ArrowType::BOOL: + type = ParquetType::BOOLEAN; + break; + case ArrowType::UINT8: + type = ParquetType::INT32; + logical_type = LogicalType::UINT_8; + break; + case ArrowType::INT8: + type = ParquetType::INT32; + logical_type = LogicalType::INT_8; + break; + case ArrowType::UINT16: + type = ParquetType::INT32; + logical_type = LogicalType::UINT_16; + break; + case ArrowType::INT16: + type = ParquetType::INT32; + logical_type = LogicalType::INT_16; + break; + case ArrowType::UINT32: + if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) { + type = ParquetType::INT64; + } else { + type = ParquetType::INT32; + logical_type = LogicalType::UINT_32; + } + break; + case ArrowType::INT32: + type = ParquetType::INT32; + break; + case ArrowType::UINT64: + type = ParquetType::INT64; + logical_type = LogicalType::UINT_64; + break; + case ArrowType::INT64: + type = ParquetType::INT64; + break; + case ArrowType::FLOAT: + type = ParquetType::FLOAT; + break; + case ArrowType::DOUBLE: + type = ParquetType::DOUBLE; + break; + case ArrowType::STRING: + type = ParquetType::BYTE_ARRAY; + logical_type = LogicalType::UTF8; + break; + case ArrowType::BINARY: + type = ParquetType::BYTE_ARRAY; + break; + case ArrowType::DATE: + type = ParquetType::INT32; + logical_type = LogicalType::DATE; + break; + case ArrowType::TIMESTAMP: { + auto timestamp_type = static_cast<::arrow::TimestampType*>(field->type.get()); + if (timestamp_type->unit != ::arrow::TimestampType::Unit::MILLI) { + return Status::NotImplemented( + "Other timestamp units than millisecond are not yet support with parquet."); + } + type = ParquetType::INT64; + logical_type = LogicalType::TIMESTAMP_MILLIS; + } break; + case ArrowType::TIMESTAMP_DOUBLE: + type = ParquetType::INT64; + // This is specified as seconds since the UNIX epoch + // TODO: Converted type in Parquet? + // logical_type = LogicalType::TIMESTAMP_MILLIS; + break; + case ArrowType::TIME: + type = ParquetType::INT64; + logical_type = LogicalType::TIME_MILLIS; + break; + case ArrowType::STRUCT: { + auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type); + return StructToNode(struct_type, field->name, field->nullable, properties, out); + } break; + default: + // TODO: LIST, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR + return Status::NotImplemented("unhandled type"); + } + *out = PrimitiveNode::Make(field->name, repetition, type, logical_type, length); + return Status::OK(); +} + +Status ToParquetSchema(const ::arrow::Schema* arrow_schema, + const WriterProperties& properties, std::shared_ptr* out) { + std::vector nodes(arrow_schema->num_fields()); + for (int i = 0; i < arrow_schema->num_fields(); i++) { + RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), properties, &nodes[i])); + } + + NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes); + *out = std::make_shared<::parquet::SchemaDescriptor>(); + PARQUET_CATCH_NOT_OK((*out)->Init(schema)); + + return Status::OK(); +} + +} // namespace arrow + +} // namespace parquet diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h new file mode 100644 index 00000000..6917b905 --- /dev/null +++ b/src/parquet/arrow/schema.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_ARROW_SCHEMA_H +#define PARQUET_ARROW_SCHEMA_H + +#include + +#include "arrow/schema.h" +#include "arrow/type.h" +#include "arrow/util/visibility.h" + +#include "parquet/api/schema.h" +#include "parquet/api/writer.h" + +namespace arrow { + +class Status; + +} // namespace arrow + +namespace parquet { + +namespace arrow { + +::arrow::Status PARQUET_EXPORT NodeToField( + const schema::NodePtr& node, std::shared_ptr<::arrow::Field>* out); + +::arrow::Status PARQUET_EXPORT FromParquetSchema( + const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out); + +::arrow::Status PARQUET_EXPORT FieldToNode(const std::shared_ptr<::arrow::Field>& field, + const WriterProperties& properties, schema::NodePtr* out); + +::arrow::Status PARQUET_EXPORT ToParquetSchema(const ::arrow::Schema* arrow_schema, + const WriterProperties& properties, std::shared_ptr* out); + +} // namespace arrow + +} // namespace parquet + +#endif // PARQUET_ARROW_SCHEMA_H diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h new file mode 100644 index 00000000..deac9f75 --- /dev/null +++ b/src/parquet/arrow/test-util.h @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "arrow/test-util.h" +#include "arrow/types/primitive.h" +#include "arrow/types/string.h" + +namespace parquet { + +namespace arrow { + +template +using is_arrow_float = std::is_floating_point; + +template +using is_arrow_int = std::is_integral; + +template +using is_arrow_string = std::is_same; + +template +typename std::enable_if::value, + std::shared_ptr<::arrow::PrimitiveArray>>::type +NonNullArray(size_t size) { + std::vector values; + ::arrow::test::random_real(size, 0, 0, 1, &values); + ::arrow::NumericBuilder builder( + ::arrow::default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size()); + return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish()); +} + +template +typename std::enable_if::value, + std::shared_ptr<::arrow::PrimitiveArray>>::type +NonNullArray(size_t size) { + std::vector values; + ::arrow::test::randint(size, 0, 64, &values); + ::arrow::NumericBuilder builder( + ::arrow::default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size()); + return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish()); +} + +template +typename std::enable_if::value, + std::shared_ptr<::arrow::StringArray>>::type +NonNullArray(size_t size) { + ::arrow::StringBuilder builder( + ::arrow::default_memory_pool(), std::make_shared<::arrow::StringType>()); + for (size_t i = 0; i < size; i++) { + builder.Append("test-string"); + } + return std::static_pointer_cast<::arrow::StringArray>(builder.Finish()); +} + +template <> +std::shared_ptr<::arrow::PrimitiveArray> NonNullArray<::arrow::BooleanType>(size_t size) { + std::vector values; + ::arrow::test::randint(size, 0, 1, &values); + ::arrow::BooleanBuilder builder( + ::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>()); + builder.Append(values.data(), values.size()); + return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish()); +} + +// This helper function only supports (size/2) nulls. +template +typename std::enable_if::value, + std::shared_ptr<::arrow::PrimitiveArray>>::type +NullableArray(size_t size, size_t num_nulls) { + std::vector values; + ::arrow::test::random_real(size, 0, 0, 1, &values); + std::vector valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + ::arrow::NumericBuilder builder( + ::arrow::default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size(), valid_bytes.data()); + return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish()); +} + +// This helper function only supports (size/2) nulls. +template +typename std::enable_if::value, + std::shared_ptr<::arrow::PrimitiveArray>>::type +NullableArray(size_t size, size_t num_nulls) { + std::vector values; + ::arrow::test::randint(size, 0, 64, &values); + std::vector valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + ::arrow::NumericBuilder builder( + ::arrow::default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size(), valid_bytes.data()); + return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish()); +} + +// This helper function only supports (size/2) nulls yet. +template +typename std::enable_if::value, + std::shared_ptr<::arrow::StringArray>>::type +NullableArray(size_t size, size_t num_nulls) { + std::vector valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + ::arrow::StringBuilder builder( + ::arrow::default_memory_pool(), std::make_shared<::arrow::StringType>()); + for (size_t i = 0; i < size; i++) { + builder.Append("test-string"); + } + return std::static_pointer_cast<::arrow::StringArray>(builder.Finish()); +} + +// This helper function only supports (size/2) nulls yet. +template <> +std::shared_ptr<::arrow::PrimitiveArray> NullableArray<::arrow::BooleanType>( + size_t size, size_t num_nulls) { + std::vector values; + ::arrow::test::randint(size, 0, 1, &values); + std::vector valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + ::arrow::BooleanBuilder builder( + ::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>()); + builder.Append(values.data(), values.size(), valid_bytes.data()); + return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish()); +} + +std::shared_ptr<::arrow::Column> MakeColumn(const std::string& name, + const std::shared_ptr<::arrow::Array>& array, bool nullable) { + auto field = std::make_shared<::arrow::Field>(name, array->type(), nullable); + return std::make_shared<::arrow::Column>(field, array); +} + +std::shared_ptr<::arrow::Table> MakeSimpleTable( + const std::shared_ptr<::arrow::Array>& values, bool nullable) { + std::shared_ptr<::arrow::Column> column = MakeColumn("col", values, nullable); + std::vector> columns({column}); + std::vector> fields({column->field()}); + auto schema = std::make_shared<::arrow::Schema>(fields); + return std::make_shared<::arrow::Table>("table", schema, columns); +} + +template +void ExpectArray(T* expected, ::arrow::Array* result) { + auto p_array = static_cast<::arrow::PrimitiveArray*>(result); + for (int i = 0; i < result->length(); i++) { + EXPECT_EQ(expected[i], reinterpret_cast(p_array->data()->data())[i]); + } +} + +template +void ExpectArray(typename ArrowType::c_type* expected, ::arrow::Array* result) { + ::arrow::PrimitiveArray* p_array = static_cast<::arrow::PrimitiveArray*>(result); + for (int64_t i = 0; i < result->length(); i++) { + EXPECT_EQ(expected[i], + reinterpret_cast(p_array->data()->data())[i]); + } +} + +template <> +void ExpectArray<::arrow::BooleanType>(uint8_t* expected, ::arrow::Array* result) { + ::arrow::BooleanBuilder builder( + ::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>()); + builder.Append(expected, result->length()); + std::shared_ptr<::arrow::Array> expected_array = builder.Finish(); + EXPECT_TRUE(result->Equals(expected_array)); +} + +} // namespace arrow + +} // namespace parquet diff --git a/src/parquet/arrow/utils.h b/src/parquet/arrow/utils.h new file mode 100644 index 00000000..b443c998 --- /dev/null +++ b/src/parquet/arrow/utils.h @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_ARROW_UTILS_H +#define PARQUET_ARROW_UTILS_H + +#include + +#include "arrow/util/status.h" +#include "parquet/exception.h" + +namespace parquet { +namespace arrow { + +#define PARQUET_CATCH_NOT_OK(s) \ + try { \ + (s); \ + } catch (const ::parquet::ParquetException& e) { \ + return ::arrow::Status::Invalid(e.what()); \ + } + +#define PARQUET_IGNORE_NOT_OK(s) \ + try { \ + (s); \ + } catch (const ::parquet::ParquetException& e) {} + +#define PARQUET_THROW_NOT_OK(s) \ + do { \ + ::arrow::Status _s = (s); \ + if (!_s.ok()) { \ + std::stringstream ss; \ + ss << "Arrow error: " << _s.ToString(); \ + throw ::parquet::ParquetException(ss.str()); \ + } \ + } while (0); + +} // namespace arrow +} // namespace parquet + +#endif // PARQUET_ARROW_UTILS_H diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc new file mode 100644 index 00000000..5b5f41fc --- /dev/null +++ b/src/parquet/arrow/writer.cc @@ -0,0 +1,373 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/arrow/writer.h" + +#include +#include + +#include "parquet/arrow/schema.h" +#include "parquet/arrow/utils.h" + +#include "arrow/array.h" +#include "arrow/column.h" +#include "arrow/table.h" +#include "arrow/types/construct.h" +#include "arrow/types/primitive.h" +#include "arrow/types/string.h" +#include "arrow/util/status.h" + +using arrow::MemoryPool; +using arrow::PoolBuffer; +using arrow::PrimitiveArray; +using arrow::Status; +using arrow::StringArray; +using arrow::Table; + +using parquet::ParquetFileWriter; +using parquet::ParquetVersion; +using parquet::schema::GroupNode; + +namespace parquet { +namespace arrow { + +class FileWriter::Impl { + public: + Impl(MemoryPool* pool, std::unique_ptr writer); + + Status NewRowGroup(int64_t chunk_size); + template + Status TypedWriteBatch(ColumnWriter* writer, const PrimitiveArray* data, + int64_t offset, int64_t length); + + // TODO(uwe): Same code as in reader.cc the only difference is the name of the temporary + // buffer + template + struct can_copy_ptr { + static constexpr bool value = + std::is_same::value || + (std::is_integral{} && std::is_integral{} && + (sizeof(InType) == sizeof(OutType))); + }; + + template ::value>::type* = nullptr> + Status ConvertPhysicalType(const InType* in_ptr, int64_t, const OutType** out_ptr) { + *out_ptr = reinterpret_cast(in_ptr); + return Status::OK(); + } + + template ::value>::type* = nullptr> + Status ConvertPhysicalType( + const InType* in_ptr, int64_t length, const OutType** out_ptr) { + RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(OutType))); + OutType* mutable_out_ptr = reinterpret_cast(data_buffer_.mutable_data()); + std::copy(in_ptr, in_ptr + length, mutable_out_ptr); + *out_ptr = mutable_out_ptr; + return Status::OK(); + } + + Status WriteFlatColumnChunk(const PrimitiveArray* data, int64_t offset, int64_t length); + Status WriteFlatColumnChunk(const StringArray* data, int64_t offset, int64_t length); + Status Close(); + + virtual ~Impl() {} + + private: + friend class FileWriter; + + MemoryPool* pool_; + // Buffer used for storing the data of an array converted to the physical type + // as expected by parquet-cpp. + PoolBuffer data_buffer_; + PoolBuffer def_levels_buffer_; + std::unique_ptr writer_; + RowGroupWriter* row_group_writer_; +}; + +FileWriter::Impl::Impl( + MemoryPool* pool, std::unique_ptr writer) + : pool_(pool), + data_buffer_(pool), + writer_(std::move(writer)), + row_group_writer_(nullptr) {} + +Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) { + if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); } + PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup(chunk_size)); + return Status::OK(); +} + +template +Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer, + const PrimitiveArray* data, int64_t offset, int64_t length) { + using ArrowCType = typename ArrowType::c_type; + using ParquetCType = typename ParquetType::c_type; + + DCHECK((offset + length) <= data->length()); + auto data_ptr = reinterpret_cast(data->data()->data()) + offset; + auto writer = + reinterpret_cast*>(column_writer); + if (writer->descr()->max_definition_level() == 0) { + // no nulls, just dump the data + const ParquetCType* data_writer_ptr = nullptr; + RETURN_NOT_OK((ConvertPhysicalType( + data_ptr, length, &data_writer_ptr))); + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, data_writer_ptr)); + } else if (writer->descr()->max_definition_level() == 1) { + RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t))); + int16_t* def_levels_ptr = + reinterpret_cast(def_levels_buffer_.mutable_data()); + if (data->null_count() == 0) { + std::fill(def_levels_ptr, def_levels_ptr + length, 1); + const ParquetCType* data_writer_ptr = nullptr; + RETURN_NOT_OK((ConvertPhysicalType( + data_ptr, length, &data_writer_ptr))); + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(length, def_levels_ptr, nullptr, data_writer_ptr)); + } else { + RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ParquetCType))); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); + int buffer_idx = 0; + for (int i = 0; i < length; i++) { + if (data->IsNull(offset + i)) { + def_levels_ptr[i] = 0; + } else { + def_levels_ptr[i] = 1; + buffer_ptr[buffer_idx++] = static_cast(data_ptr[i]); + } + } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); + } + } else { + return Status::NotImplemented("no support for max definition level > 1 yet"); + } + PARQUET_CATCH_NOT_OK(writer->Close()); + return Status::OK(); +} + +// This specialization seems quite similar but it significantly differs in two points: +// * offset is added at the most latest time to the pointer as we have sub-byte access +// * Arrow data is stored bitwise thus we cannot use std::copy to transform from +// ArrowType::c_type to ParquetType::c_type +template <> +Status FileWriter::Impl::TypedWriteBatch( + ColumnWriter* column_writer, const PrimitiveArray* data, int64_t offset, + int64_t length) { + DCHECK((offset + length) <= data->length()); + RETURN_NOT_OK(data_buffer_.Resize(length)); + auto data_ptr = reinterpret_cast(data->data()->data()); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); + auto writer = reinterpret_cast*>( + column_writer); + if (writer->descr()->max_definition_level() == 0) { + // no nulls, just dump the data + for (int64_t i = 0; i < length; i++) { + buffer_ptr[i] = ::arrow::util::get_bit(data_ptr, offset + i); + } + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, buffer_ptr)); + } else if (writer->descr()->max_definition_level() == 1) { + RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t))); + int16_t* def_levels_ptr = + reinterpret_cast(def_levels_buffer_.mutable_data()); + if (data->null_count() == 0) { + std::fill(def_levels_ptr, def_levels_ptr + length, 1); + for (int64_t i = 0; i < length; i++) { + buffer_ptr[i] = ::arrow::util::get_bit(data_ptr, offset + i); + } + // TODO(PARQUET-644): write boolean values as a packed bitmap + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); + } else { + int buffer_idx = 0; + for (int i = 0; i < length; i++) { + if (data->IsNull(offset + i)) { + def_levels_ptr[i] = 0; + } else { + def_levels_ptr[i] = 1; + buffer_ptr[buffer_idx++] = ::arrow::util::get_bit(data_ptr, offset + i); + } + } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); + } + } else { + return Status::NotImplemented("no support for max definition level > 1 yet"); + } + PARQUET_CATCH_NOT_OK(writer->Close()); + return Status::OK(); +} + +Status FileWriter::Impl::Close() { + if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); } + PARQUET_CATCH_NOT_OK(writer_->Close()); + return Status::OK(); +} + +#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \ + case ::arrow::Type::ENUM: \ + return TypedWriteBatch(writer, data, offset, length); \ + break; + +Status FileWriter::Impl::WriteFlatColumnChunk( + const PrimitiveArray* data, int64_t offset, int64_t length) { + ColumnWriter* writer; + PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn()); + switch (data->type_enum()) { + TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType) + TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type) + TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type) + TYPED_BATCH_CASE(UINT16, ::arrow::UInt16Type, Int32Type) + TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type) + case ::arrow::Type::UINT32: + if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) { + // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need + // to use the larger Int64Type to store them lossless. + return TypedWriteBatch( + writer, data, offset, length); + } else { + return TypedWriteBatch( + writer, data, offset, length); + } + TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type) + TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type) + TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type) + TYPED_BATCH_CASE(TIMESTAMP, ::arrow::TimestampType, Int64Type) + TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType) + TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType) + default: + return Status::NotImplemented(data->type()->ToString()); + } +} + +Status FileWriter::Impl::WriteFlatColumnChunk( + const StringArray* data, int64_t offset, int64_t length) { + ColumnWriter* column_writer; + PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn()); + DCHECK((offset + length) <= data->length()); + RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ByteArray))); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); + auto values = std::dynamic_pointer_cast(data->values()); + auto data_ptr = reinterpret_cast(values->data()->data()); + DCHECK(values != nullptr); + auto writer = reinterpret_cast*>( + column_writer); + if (writer->descr()->max_definition_level() > 0) { + RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t))); + } + int16_t* def_levels_ptr = reinterpret_cast(def_levels_buffer_.mutable_data()); + if (writer->descr()->max_definition_level() == 0 || data->null_count() == 0) { + // no nulls, just dump the data + for (int64_t i = 0; i < length; i++) { + buffer_ptr[i] = ByteArray( + data->value_length(i + offset), data_ptr + data->value_offset(i)); + } + if (writer->descr()->max_definition_level() > 0) { + std::fill(def_levels_ptr, def_levels_ptr + length, 1); + } + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); + } else if (writer->descr()->max_definition_level() == 1) { + int buffer_idx = 0; + for (int64_t i = 0; i < length; i++) { + if (data->IsNull(offset + i)) { + def_levels_ptr[i] = 0; + } else { + def_levels_ptr[i] = 1; + buffer_ptr[buffer_idx++] = ByteArray( + data->value_length(i + offset), data_ptr + data->value_offset(i + offset)); + } + } + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); + } else { + return Status::NotImplemented("no support for max definition level > 1 yet"); + } + PARQUET_CATCH_NOT_OK(writer->Close()); + return Status::OK(); +} + +FileWriter::FileWriter( + MemoryPool* pool, std::unique_ptr writer) + : impl_(new FileWriter::Impl(pool, std::move(writer))) {} + +Status FileWriter::NewRowGroup(int64_t chunk_size) { + return impl_->NewRowGroup(chunk_size); +} + +Status FileWriter::WriteFlatColumnChunk( + const ::arrow::Array* array, int64_t offset, int64_t length) { + int64_t real_length = length; + if (length == -1) { real_length = array->length(); } + if (array->type_enum() == ::arrow::Type::STRING) { + auto string_array = dynamic_cast(array); + DCHECK(string_array); + return impl_->WriteFlatColumnChunk(string_array, offset, real_length); + } else { + auto primitive_array = dynamic_cast(array); + if (!primitive_array) { + return Status::NotImplemented("Table must consist of PrimitiveArray instances"); + } + return impl_->WriteFlatColumnChunk(primitive_array, offset, real_length); + } +} + +Status FileWriter::Close() { + return impl_->Close(); +} + +MemoryPool* FileWriter::memory_pool() const { + return impl_->pool_; +} + +FileWriter::~FileWriter() {} + +Status WriteFlatTable(const Table* table, MemoryPool* pool, + const std::shared_ptr& sink, int64_t chunk_size, + const std::shared_ptr& properties) { + std::shared_ptr parquet_schema; + RETURN_NOT_OK( + ToParquetSchema(table->schema().get(), *properties.get(), &parquet_schema)); + auto schema_node = std::static_pointer_cast(parquet_schema->schema_root()); + std::unique_ptr parquet_writer = + ParquetFileWriter::Open(sink, schema_node, properties); + FileWriter writer(pool, std::move(parquet_writer)); + + // TODO(ARROW-232) Support writing chunked arrays. + for (int i = 0; i < table->num_columns(); i++) { + if (table->column(i)->data()->num_chunks() != 1) { + return Status::NotImplemented("No support for writing chunked arrays yet."); + } + } + + for (int chunk = 0; chunk * chunk_size < table->num_rows(); chunk++) { + int64_t offset = chunk * chunk_size; + int64_t size = std::min(chunk_size, table->num_rows() - offset); + RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close())); + for (int i = 0; i < table->num_columns(); i++) { + std::shared_ptr<::arrow::Array> array = table->column(i)->data()->chunk(0); + RETURN_NOT_OK_ELSE(writer.WriteFlatColumnChunk(array.get(), offset, size), + PARQUET_IGNORE_NOT_OK(writer.Close())); + } + } + + return writer.Close(); +} + +} // namespace arrow + +} // namespace parquet diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h new file mode 100644 index 00000000..92524d8c --- /dev/null +++ b/src/parquet/arrow/writer.h @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_ARROW_WRITER_H +#define PARQUET_ARROW_WRITER_H + +#include + +#include "parquet/api/schema.h" +#include "parquet/api/writer.h" + +namespace arrow { + +class Array; +class MemoryPool; +class PrimitiveArray; +class RowBatch; +class Status; +class StringArray; +class Table; +} + +namespace parquet { + +namespace arrow { + +/** + * Iterative API: + * Start a new RowGroup/Chunk with NewRowGroup + * Write column-by-column the whole column chunk + */ +class PARQUET_EXPORT FileWriter { + public: + FileWriter(::arrow::MemoryPool* pool, std::unique_ptr writer); + + ::arrow::Status NewRowGroup(int64_t chunk_size); + ::arrow::Status WriteFlatColumnChunk( + const ::arrow::Array* data, int64_t offset = 0, int64_t length = -1); + ::arrow::Status Close(); + + virtual ~FileWriter(); + + ::arrow::MemoryPool* memory_pool() const; + + private: + class PARQUET_NO_EXPORT Impl; + std::unique_ptr impl_; +}; + +/** + * Write a flat Table to Parquet. + * + * The table shall only consist of nullable, non-repeated columns of primitive type. + */ +::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table, + ::arrow::MemoryPool* pool, const std::shared_ptr& sink, + int64_t chunk_size, + const std::shared_ptr& properties = default_writer_properties()); + +} // namespace arrow + +} // namespace parquet + +#endif // PARQUET_ARROW_WRITER_H diff --git a/thirdparty/build_thirdparty.sh b/thirdparty/build_thirdparty.sh index dca586c6..82294848 100755 --- a/thirdparty/build_thirdparty.sh +++ b/thirdparty/build_thirdparty.sh @@ -15,6 +15,7 @@ else # Allow passing specific libs to build on the command line for arg in "$*"; do case $arg in + "arrow") F_ARROW=1 ;; "zlib") F_ZLIB=1 ;; "gbenchmark") F_GBENCHMARK=1 ;; "gtest") F_GTEST=1 ;; @@ -57,6 +58,15 @@ fi STANDARD_DARWIN_FLAGS="-std=c++11 -stdlib=libc++" +# build arrow +if [ -n "$F_ALL" -o -n "$F_ARROW" ]; then + cd $TP_DIR/$ARROW_BASEDIR/cpp + source ./setup_build_env.sh + cmake . -DARROW_PARQUET=OFF -DARROW_HDFS=ON -DCMAKE_INSTALL_PREFIX=$PREFIX + make -j$PARALLEL install + # : +fi + # build googletest GOOGLETEST_ERROR="failed for googletest!" if [ -n "$F_ALL" -o -n "$F_GTEST" ]; then diff --git a/thirdparty/download_thirdparty.sh b/thirdparty/download_thirdparty.sh index fae6c9cc..23bdb968 100755 --- a/thirdparty/download_thirdparty.sh +++ b/thirdparty/download_thirdparty.sh @@ -20,6 +20,11 @@ download_extract_and_cleanup() { rm $filename } +if [ ! -d ${ARROW_BASEDIR} ]; then + echo "Fetching arrow" + download_extract_and_cleanup $ARROW_URL +fi + if [ ! -d ${SNAPPY_BASEDIR} ]; then echo "Fetching snappy" download_extract_and_cleanup $SNAPPY_URL diff --git a/thirdparty/set_thirdparty_env.sh b/thirdparty/set_thirdparty_env.sh index 80715ef7..3733d088 100644 --- a/thirdparty/set_thirdparty_env.sh +++ b/thirdparty/set_thirdparty_env.sh @@ -7,6 +7,7 @@ if [ -z "$THIRDPARTY_DIR" ]; then THIRDPARTY_DIR=$SOURCE_DIR fi +export ARROW_HOME=$THIRDPARTY_DIR/installed export SNAPPY_HOME=$THIRDPARTY_DIR/installed export ZLIB_HOME=$THIRDPARTY_DIR/installed # build script doesn't support building thrift on OSX diff --git a/thirdparty/versions.sh b/thirdparty/versions.sh index b56262a8..05f5cc23 100755 --- a/thirdparty/versions.sh +++ b/thirdparty/versions.sh @@ -1,3 +1,7 @@ +ARROW_VERSION="6b8abb4402ff1f39fc5944a7df6e3b4755691d87" +ARROW_URL="https://github.com/apache/arrow/archive/${ARROW_VERSION}.tar.gz" +ARROW_BASEDIR="arrow-${ARROW_VERSION}" + SNAPPY_VERSION=1.1.3 SNAPPY_URL="https://github.com/google/snappy/releases/download/${SNAPPY_VERSION}/snappy-${SNAPPY_VERSION}.tar.gz" SNAPPY_BASEDIR=snappy-$SNAPPY_VERSION