From 2917a62a7a2761d41f4e8a48319d35566abdbd3d Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Fri, 22 Sep 2017 11:37:59 -0400 Subject: [PATCH 01/30] PARQUET-1095: [C++] Read and write Arrow decimal values --- cmake_modules/ThirdpartyToolchain.cmake | 2 +- data/fixed_length_decimal.parquet | Bin 0 -> 677 bytes data/fixed_length_decimal_legacy.parquet | Bin 0 -> 537 bytes data/int32_decimal.parquet | Bin 0 -> 478 bytes data/int64_decimal.parquet | Bin 0 -> 591 bytes src/parquet/arrow/arrow-reader-writer-test.cc | 124 +++++++-- src/parquet/arrow/arrow-schema-test.cc | 4 +- src/parquet/arrow/reader.cc | 235 +++++++++++++++++- src/parquet/arrow/record_reader.cc | 2 +- src/parquet/arrow/schema.cc | 104 +++++++- src/parquet/arrow/schema.h | 2 + src/parquet/arrow/test-util.h | 42 ++++ src/parquet/arrow/writer.cc | 88 +++++-- src/parquet/encoding-internal.h | 4 +- src/parquet/file/reader.cc | 18 +- 15 files changed, 560 insertions(+), 65 deletions(-) create mode 100644 data/fixed_length_decimal.parquet create mode 100644 data/fixed_length_decimal_legacy.parquet create mode 100644 data/int32_decimal.parquet create mode 100644 data/int64_decimal.parquet diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake index a470fc18..601b4180 100644 --- a/cmake_modules/ThirdpartyToolchain.cmake +++ b/cmake_modules/ThirdpartyToolchain.cmake @@ -366,7 +366,7 @@ if (NOT ARROW_FOUND) -DARROW_BUILD_TESTS=OFF) if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "") - set(ARROW_VERSION "0e21f84c2fc26dba949a03ee7d7ebfade0a65b81") # Arrow 0.7.1 + set(ARROW_VERSION "b2596f66ac2a8aa66da81c1f75c36fa30c40d0df") else() set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}") endif() diff --git a/data/fixed_length_decimal.parquet b/data/fixed_length_decimal.parquet new file mode 100644 index 0000000000000000000000000000000000000000..69fce531e4d8e40890e28d9bcbb74d835971be75 GIT binary patch literal 677 zcmZY7OG?8)7y#gDtTjI9$|(tC5e6(R#E?YO7cO+C;G(z^MeHP*XmQfkJbX~P^8hYg zxby%XzyrAS03N^txD;_(+L?l30$=9;=bxX+wD*o|2;g&`4Fg(?1t@FEDwsXXow*P2 zUK-C!?6q>U3;+Vbcg~z4_$c_zDIE*GaW3eB4}xEu`9r~1&c$`XJHb!R!mi*8=aMW) z1wT0DZNVqOAI{|tlvHDXEg9hW>OAo;Nra&UU=fy}q%7WGnvsnan1>E1S*oULWm$to z)s+iCorWQclz#JUl;xed)Bb<-s>Xg0VC4ZAFR*~BDb;>_Vd4R1JDZax%;cJytefkM zq-NZ|H6xPZ0H;{-hA6!s5Y$3R8fSir3>0<=4HDEk9ikqN#-T~aEHcduh~JGctvIgX zRM~VdqZE6TWwH{)K(pegH5`r3MnE~3ZoL`B`VEPbZojA7X2W#!ikEe1p!+q~aq5jm m&1n+1X<>JZc=cwZzU?*Lu;y=tp>2nD&98zJcE{T6g8l(ebZuw= literal 0 HcmV?d00001 diff --git a/data/fixed_length_decimal_legacy.parquet b/data/fixed_length_decimal_legacy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..b0df62a2e1c8f6d543bc320bb1e5c2810338fb7d GIT binary patch literal 537 zcmZwFJx;?g6aZj1v?WET6Rr}JA+ki(N+C+prhj694J3w&2?-(UBo0(K{mD-ORoyrM zBL`q)-~b$;BM0CB9DoTiV4JcLIbWQ-r}v!1{eu%50{ASkVL*#bi%`~-B49^p@OuTz z3cr=uY329~1i~dZX9ypJpIqrw_{v?=h4;b_ZvI&K!YynGZ-wvN;=b^SyDkf<@Qo`s zg^$8-ZfO7|)i~TtCOE#nNc<}jVJHDuhMQ1Q3NuVIvapQvdRpIKa4P~*$u;~v&W;1QtVNd z$x09d{S`;O`C@Ut0Lsy-^==gFGZH7m@kqDLj_K$%FB{T8_gk*xv^$-a(!8p6aG!08AA3n{F8}}l literal 0 HcmV?d00001 diff --git a/data/int32_decimal.parquet b/data/int32_decimal.parquet new file mode 100644 index 0000000000000000000000000000000000000000..5bf2d4ea3845820abb3c5e89b486d0211d2c1804 GIT binary patch literal 478 zcmZ9JPfpuV5XL7a32w!b7uk{*){}yWWGvgn`6m{z1BnIds!~;z7yB6^o|E7|X;G9N z2SBViKo8IZZ~zu8I6x1;0a&u=mndv1%};u7zR`R$nzN%XJ|p&AK{(9Av(9wOsGff! zGO95VH2`VAt1Q1yz^M@3fj96K(j%}ph;G3vIIR->0oygAU*In|tP}kP`X-SAFQBax zJ%D$xG-R6T9B<`QkzM`Bqf40zrXktXjM_}7QZ_%cD$}XqnU-no=oacjy-wy@W>6aU z8e&I3_-_T%!fNP+>6axQnU>L-WEZZO3OKndQ#Zd>F5It6S-9Ecr<=+`#G(*}nDgRh zD*1rtMOH=ycX%?AD$e=f+nkR@x|CKkLRnYFG8&~q?Y5nuh3vRgN(>bW(Lp-=qc&tYcYI_yatim5cQT)T?Fw0Q?3XN~Qh^T(6b+9q<#lTQBt&puAk> z9QXm;mP`E^_zPUT6t1Yo{#rJs>D5^lUb2MZ3c}JxMXB9TUa*Z-Ea7$3uvAUenzDu^ z)s^c=1se;e@l_SODewGqRjMYg5{b45%}On#YD#OAUYK-D#YA&H+{~`IDNb=+u-r^X zw`Rg}8qu68!Gz@ZV@7%;%hMvvi9zB4<55O>ClfNH$;_IcRX9)?v2c)3zG+*A(-QKq z;G70rREieNAdBXtH@_rjE1^xc1=g$42l7d^w4u1r_=3vj?Y}* gqHdc79lzVz4tzKE!gd_nc5HiL3mu6s-WR{pA1Og=zyJUM literal 0 HcmV?d00001 diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index a18c5650..b068ff7d 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -37,6 +37,7 @@ #include "arrow/api.h" #include "arrow/test-util.h" +#include "arrow/util/decimal.h" using arrow::Array; using arrow::ArrayVisitor; @@ -68,10 +69,10 @@ using ColumnVector = std::vector>; namespace parquet { namespace arrow { -const int SMALL_SIZE = 100; -const int LARGE_SIZE = 10000; +static constexpr int SMALL_SIZE = 100; +static constexpr int LARGE_SIZE = 10000; -constexpr uint32_t kDefaultSeed = 0; +static constexpr uint32_t kDefaultSeed = 0; LogicalType::type get_logical_type(const ::arrow::DataType& type) { switch (type.id()) { @@ -118,6 +119,8 @@ LogicalType::type get_logical_type(const ::arrow::DataType& type) { static_cast(type); return get_logical_type(*dict_type.dictionary()->type()); } + case ArrowId::DECIMAL: + return LogicalType::DECIMAL; default: break; } @@ -147,6 +150,7 @@ ParquetType::type get_physical_type(const ::arrow::DataType& type) { case ArrowId::STRING: return ParquetType::BYTE_ARRAY; case ArrowId::FIXED_SIZE_BINARY: + case ArrowId::DECIMAL: return ParquetType::FIXED_LEN_BYTE_ARRAY; case ArrowId::DATE32: return ParquetType::INT32; @@ -296,9 +300,18 @@ struct test_traits<::arrow::FixedSizeBinaryType> { static std::string const value; }; +template <> +struct test_traits<::arrow::DecimalType> { + static constexpr ParquetType::type parquet_enum = ParquetType::FIXED_LEN_BYTE_ARRAY; + static ::arrow::Decimal128 const value; +}; + const std::string test_traits<::arrow::StringType>::value("Test"); // NOLINT const std::string test_traits<::arrow::BinaryType>::value("\x00\x01\x02\x03"); // NOLINT const std::string test_traits<::arrow::FixedSizeBinaryType>::value("Fixed"); // NOLINT +const ::arrow::Decimal128 test_traits<::arrow::DecimalType>::value( + "-83095209205923957.2323995"); // NOLINT + template using ParquetDataType = DataType::parquet_enum>; @@ -342,28 +355,44 @@ void DoSimpleRoundtrip(const std::shared_ptr& table, int num_threads, static std::shared_ptr MakeSimpleSchema(const ::arrow::DataType& type, Repetition::type repetition) { - int byte_width; - // Decimal is not implemented yet. + int32_t byte_width = -1; + int32_t precision = -1; + int32_t scale = -1; + switch (type.id()) { case ::arrow::Type::DICTIONARY: { - const ::arrow::DictionaryType& dict_type = - static_cast(type); + const auto& dict_type = static_cast(type); const ::arrow::DataType& values_type = *dict_type.dictionary()->type(); - if (values_type.id() == ::arrow::Type::FIXED_SIZE_BINARY) { - byte_width = - static_cast(values_type).byte_width(); - } else { - byte_width = -1; + switch (values_type.id()) { + case ::arrow::Type::FIXED_SIZE_BINARY: + byte_width = + static_cast(values_type).byte_width(); + break; + case ::arrow::Type::DECIMAL: { + const auto& decimal_type = + static_cast(values_type); + precision = decimal_type.precision(); + scale = decimal_type.scale(); + byte_width = DecimalSize(precision); + } break; + default: + break; } } break; case ::arrow::Type::FIXED_SIZE_BINARY: byte_width = static_cast(type).byte_width(); break; + case ::arrow::Type::DECIMAL: { + const auto& decimal_type = static_cast(type); + precision = decimal_type.precision(); + scale = decimal_type.scale(); + byte_width = DecimalSize(precision); + } break; default: - byte_width = -1; + break; } auto pnode = PrimitiveNode::Make("column1", repetition, get_physical_type(type), - get_logical_type(type), byte_width); + get_logical_type(type), byte_width, precision, scale); NodePtr node_ = GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); return std::static_pointer_cast(node_); @@ -371,7 +400,7 @@ static std::shared_ptr MakeSimpleSchema(const ::arrow::DataType& type namespace internal { -void AssertArraysEqual(const Array &expected, const Array &actual) { +void AssertArraysEqual(const Array& expected, const Array& actual) { if (!actual.Equals(expected)) { std::stringstream pp_result; std::stringstream pp_expected; @@ -526,11 +555,11 @@ class TestParquetIO : public ::testing::Test { // There we write an UInt32 Array but receive an Int64 Array as result for // Parquet version 1.0. -typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, - ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, - ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::Date32Type, - ::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType, - ::arrow::BinaryType, ::arrow::FixedSizeBinaryType> +typedef ::testing::Types< + ::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, + ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type, + ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType, + ::arrow::BinaryType, ::arrow::FixedSizeBinaryType, ::arrow::DecimalType> TestTypes; TYPED_TEST_CASE(TestParquetIO, TestTypes); @@ -1889,6 +1918,61 @@ TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) { ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table)); } +class TestArrowReaderAdHocSpark + : public ::testing::TestWithParam< + std::tuple>> {}; + +TEST_P(TestArrowReaderAdHocSpark, ReadDecimals) { + std::string path(std::getenv("PARQUET_TEST_DATA")); + + std::string filename; + std::shared_ptr<::arrow::DataType> decimal_type; + std::tie(filename, decimal_type) = GetParam(); + + path += "/" + filename; + ASSERT_GT(path.size(), 0); + + auto pool = ::arrow::default_memory_pool(); + + std::unique_ptr arrow_reader; + ASSERT_NO_THROW( + arrow_reader.reset(new FileReader(pool, ParquetFileReader::OpenFile(path, false)))); + std::shared_ptr<::arrow::Table> table; + ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table)); + + ASSERT_EQ(1, table->num_columns()); + + constexpr int32_t expected_length = 24; + + auto value_column = table->column(0); + ASSERT_EQ(expected_length, value_column->length()); + + auto raw_array = value_column->data(); + ASSERT_EQ(1, raw_array->num_chunks()); + + auto chunk = raw_array->chunk(0); + + std::shared_ptr expected_array; + + ::arrow::DecimalBuilder builder(decimal_type, pool); + + for (int32_t i = 0; i < expected_length; ++i) { + ::arrow::Decimal128 value((i + 1) * 100); + ASSERT_OK(builder.Append(value)); + } + ASSERT_OK(builder.Finish(&expected_array)); + + internal::AssertArraysEqual(*expected_array, *chunk); +} + +INSTANTIATE_TEST_CASE_P( + ReadDecimals, TestArrowReaderAdHocSpark, + ::testing::Values( + std::make_tuple("int32_decimal.parquet", ::arrow::decimal(4, 2)), + std::make_tuple("int64_decimal.parquet", ::arrow::decimal(10, 2)), + std::make_tuple("fixed_length_decimal.parquet", ::arrow::decimal(25, 2)), + std::make_tuple("fixed_length_decimal_legacy.parquet", ::arrow::decimal(13, 2)))); + } // namespace arrow } // namespace parquet diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc index a7a62c57..07e72907 100644 --- a/src/parquet/arrow/arrow-schema-test.cc +++ b/src/parquet/arrow/arrow-schema-test.cc @@ -62,8 +62,8 @@ class TestConvertParquetSchema : public ::testing::Test { for (int i = 0; i < expected_schema->num_fields(); ++i) { auto lhs = result_schema_->field(i); auto rhs = expected_schema->field(i); - EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString() - << " != " << rhs->ToString(); + EXPECT_TRUE(lhs->Equals(rhs)) + << i << " " << lhs->ToString() << " != " << rhs->ToString(); } } diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index 5edc8379..53273f71 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -29,6 +29,7 @@ #include "arrow/api.h" #include "arrow/util/bit-util.h" +#include "arrow/util/decimal.h" #include "arrow/util/logging.h" #include "arrow/util/parallel.h" @@ -716,7 +717,8 @@ struct supports_fast_path_impl { template using supports_fast_path = - typename std::enable_if::value>::type; + typename std::enable_if::value, + ParquetType>::type; template struct TransferFunctor { @@ -874,6 +876,218 @@ struct TransferFunctor< } }; +static uint64_t BytesToInteger(const uint8_t* bytes, int32_t start, int32_t stop) { + const int32_t length = stop - start; + + DCHECK_GE(length, 0); + DCHECK_LE(length, 8); + + switch (length) { + // We can forego the loop if the number of bytes to convert is a power of two + case 0: + return 0; + case 2: + return ::arrow::BitUtil::FromBigEndian( + *reinterpret_cast(bytes + start)); + case 4: + return ::arrow::BitUtil::FromBigEndian( + *reinterpret_cast(bytes + start)); + case 8: + return ::arrow::BitUtil::FromBigEndian( + *reinterpret_cast(bytes + start)); + default: { + // Take a slower path for non power-of-2 number of bytes + uint64_t value = 0; + + const auto unsigned_stop = static_cast(stop); + + for (int32_t i = start; i < stop; ++i) { + const uint64_t bits_to_shift = (unsigned_stop - i - 1) * CHAR_BIT; + const uint64_t byte_value = bytes[i]; + const uint64_t shifted_value = byte_value << bits_to_shift; + value |= shifted_value; + } + + return value; + } + } +} + +static constexpr int32_t kMinDecimalBytes = 1; +static constexpr int32_t kMaxDecimalBytes = 16; + +/// \brief Convert a sequence of big-endian bytes to one int64_t (high bits) and one +/// uint64_t (low bits). +static void BytesToIntegerPair(const uint8_t* bytes, + const int32_t total_number_of_bytes_used, int64_t* high, + uint64_t* low) { + DCHECK_GE(total_number_of_bytes_used, kMinDecimalBytes); + DCHECK_LE(total_number_of_bytes_used, kMaxDecimalBytes); + + /// Bytes are coming in big-endian, so the first byte is the MSB and therefore holds the + /// sign bit. + const bool is_negative = static_cast(bytes[0]) < 0; + + /// Sign extend the low bits if necessary + *low = UINT64_MAX * (is_negative && total_number_of_bytes_used < 8); + *high = -1 * (is_negative && total_number_of_bytes_used < kMaxDecimalBytes); + + /// Stop byte of the high bytes + const int32_t high_bits_offset = std::max(0, total_number_of_bytes_used - 8); + + /// Shift left enough bits to make room for the incoming int64_t + *high <<= high_bits_offset * CHAR_BIT; + + /// Preserve the upper bits by inplace OR-ing the int64_t + *high |= BytesToInteger(bytes, 0, high_bits_offset); + + /// Stop byte of the low bytes + const int32_t low_bits_offset = std::min(total_number_of_bytes_used, 8); + + /// Shift left enough bits to make room for the incoming uint64_t + *low <<= low_bits_offset * CHAR_BIT; + + /// Preserve the upper bits by inplace OR-ing the uint64_t + *low |= BytesToInteger(bytes, high_bits_offset, total_number_of_bytes_used); +} + +static inline void RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width, + uint8_t* out_buf) { + // view the first 8 bytes as an unsigned 64-bit integer + auto low = reinterpret_cast(out_buf); + + // view the second 8 bytes as a signed 64-bit integer + auto high = reinterpret_cast(out_buf + sizeof(uint64_t)); + + // Convert the fixed size binary array bytes into a Decimal128 compatible layout + BytesToIntegerPair(value, byte_width, high, low); +} + +/// \brief Convert an array of FixedLenByteArrays to an arrow::DecimalArray +/// We do this by: +/// 1. Creating a arrow::FixedSizeBinaryArray from the RecordReader's builder +/// 2. Allocating a buffer for the arrow::DecimalArray +/// 3. Converting the big-endian bytes in the FixedSizeBinaryArray to two integers +/// representing the high and low bits of each decimal value. +template <> +struct TransferFunctor<::arrow::DecimalType, FLBAType> { + Status operator()(RecordReader* reader, MemoryPool* pool, + const std::shared_ptr<::arrow::DataType>& type, + std::shared_ptr* out) { + DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL); + + // Finish the built data into a temporary array + std::shared_ptr array; + RETURN_NOT_OK(reader->builder()->Finish(&array)); + const auto& fixed_size_binary_array = + static_cast(*array); + + // Get the byte width of the values in the FixedSizeBinaryArray. Most of the time + // this will be different from the decimal array width because we write the minimum + // number of bytes necessary to represent a given precision + const int32_t byte_width = + static_cast(*fixed_size_binary_array.type()) + .byte_width(); + + // The byte width of each decimal value + const int32_t type_length = + static_cast(*type).byte_width(); + + // number of elements in the entire array + const int64_t length = fixed_size_binary_array.length(); + + // allocate memory for the decimal array + std::shared_ptr data; + RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data)); + + // raw bytes that we can write to + uint8_t* out_ptr = data->mutable_data(); + + // convert each FixedSizeBinary value to valid decimal bytes + const int64_t null_count = fixed_size_binary_array.null_count(); + if (null_count > 0) { + for (int64_t i = 0; i < length; ++i, out_ptr += type_length) { + if (!fixed_size_binary_array.IsNull(i)) { + RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, + out_ptr); + } + } + } else { + for (int64_t i = 0; i < length; ++i, out_ptr += type_length) { + RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr); + } + } + + *out = std::make_shared<::arrow::DecimalArray>( + type, length, data, fixed_size_binary_array.null_bitmap(), null_count); + return Status::OK(); + } +}; + +/// \brief Convert an Int32 or Int64 array into a DecimalArray +/// The parquet spec allows systems to write decimals in int32, int64 if the values are +/// small enough to fit in less 4 bytes or less than 8 bytes, respectively. +/// This function implements the conversion from int32 and int64 arrays to decimal arrays. +template ::value || + std::is_same::value>::type> +static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, + const std::shared_ptr<::arrow::DataType>& type, + std::shared_ptr* out) { + DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL); + + const int64_t length = reader->values_written(); + + using ElementType = typename ParquetIntegerType::c_type; + static_assert(std::is_signed::value, "ElementType is not signed"); + + const auto values = reinterpret_cast(reader->values()); + + const auto& decimal_type = static_cast(*type); + const int64_t type_length = decimal_type.byte_width(); + + std::shared_ptr data; + RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data)); + uint8_t* out_ptr = data->mutable_data(); + + for (int64_t i = 0; i < length; ++i, out_ptr += type_length) { + const ElementType value = values[i]; + const uint64_t raw[] = { + ::arrow::BitUtil::ToLittleEndian(static_cast(value)), + ::arrow::BitUtil::ToLittleEndian(static_cast(value < 0 ? -1 : 0))}; + const auto bytes = reinterpret_cast(raw); + std::copy(bytes, bytes + type_length, out_ptr); + } + + if (reader->nullable_values()) { + std::shared_ptr is_valid = reader->ReleaseIsValid(); + *out = std::make_shared<::arrow::DecimalArray>(type, length, data, is_valid, + reader->null_count()); + } else { + *out = std::make_shared<::arrow::DecimalArray>(type, length, data); + } + return Status::OK(); +} + +template <> +struct TransferFunctor<::arrow::DecimalType, Int32Type> { + Status operator()(RecordReader* reader, MemoryPool* pool, + const std::shared_ptr<::arrow::DataType>& type, + std::shared_ptr* out) { + return DecimalIntegerTransfer(reader, pool, type, out); + } +}; + +template <> +struct TransferFunctor<::arrow::DecimalType, Int64Type> { + Status operator()(RecordReader* reader, MemoryPool* pool, + const std::shared_ptr<::arrow::DataType>& type, + std::shared_ptr* out) { + return DecimalIntegerTransfer(reader, pool, type, out); + } +}; + #define TRANSFER_DATA(ArrowType, ParquetType) \ TransferFunctor func; \ RETURN_NOT_OK(func(record_reader_.get(), pool_, field_->type(), out)); \ @@ -932,6 +1146,22 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr* TRANSFER_CASE(DATE32, ::arrow::Date32Type, Int32Type) TRANSFER_CASE(DATE64, ::arrow::Date64Type, Int32Type) TRANSFER_CASE(FIXED_SIZE_BINARY, ::arrow::FixedSizeBinaryType, FLBAType) + case ::arrow::Type::DECIMAL: { + switch (descr_->physical_type()) { + case ::parquet::Type::INT32: { + TRANSFER_DATA(::arrow::DecimalType, Int32Type); + } break; + case ::parquet::Type::INT64: { + TRANSFER_DATA(::arrow::DecimalType, Int64Type); + } break; + case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { + TRANSFER_DATA(::arrow::DecimalType, FLBAType); + } break; + default: + return Status::Invalid( + "Physical type for decimal must be int32, int64, or fixed length binary"); + } + } break; case ::arrow::Type::TIMESTAMP: { ::arrow::TimestampType* timestamp_type = static_cast<::arrow::TimestampType*>(field_->type().get()); @@ -946,8 +1176,7 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr* default: return Status::NotImplemented("TimeUnit not supported"); } - break; - } + } break; TRANSFER_CASE(TIME32, ::arrow::Time32Type, Int32Type) TRANSFER_CASE(TIME64, ::arrow::Time64Type, Int64Type) default: diff --git a/src/parquet/arrow/record_reader.cc b/src/parquet/arrow/record_reader.cc index 7275d2f8..6405ee77 100644 --- a/src/parquet/arrow/record_reader.cc +++ b/src/parquet/arrow/record_reader.cc @@ -83,7 +83,7 @@ class RecordReader::RecordReaderImpl { Reset(); } - virtual ~RecordReaderImpl() {} + virtual ~RecordReaderImpl() = default; virtual int64_t ReadRecords(int64_t num_records) = 0; diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index e16a1afb..7027cb3a 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -25,6 +25,7 @@ #include "parquet/util/schema-util.h" #include "arrow/api.h" +#include "arrow/util/logging.h" using arrow::Field; using arrow::Status; @@ -50,9 +51,8 @@ const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO); const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO); TypePtr MakeDecimalType(const PrimitiveNode& node) { - int precision = node.decimal_metadata().precision; - int scale = node.decimal_metadata().scale; - return std::make_shared<::arrow::DecimalType>(precision, scale); + const auto& metadata = node.decimal_metadata(); + return ::arrow::decimal(metadata.precision, metadata.scale); } static Status FromByteArray(const PrimitiveNode& node, TypePtr* out) { @@ -473,7 +473,10 @@ Status FieldToNode(const std::shared_ptr& field, ParquetType::type type; Repetition::type repetition = field->nullable() ? Repetition::OPTIONAL : Repetition::REQUIRED; + int length = -1; + int precision = -1; + int scale = -1; switch (field->type()->id()) { case ArrowType::NA: @@ -532,9 +535,17 @@ Status FieldToNode(const std::shared_ptr& field, break; case ArrowType::FIXED_SIZE_BINARY: { type = ParquetType::FIXED_LEN_BYTE_ARRAY; - auto fixed_size_binary_type = - static_cast<::arrow::FixedSizeBinaryType*>(field->type().get()); - length = fixed_size_binary_type->byte_width(); + const auto& fixed_size_binary_type = + static_cast(*field->type()); + length = fixed_size_binary_type.byte_width(); + } break; + case ArrowType::DECIMAL: { + type = ParquetType::FIXED_LEN_BYTE_ARRAY; + logical_type = LogicalType::DECIMAL; + const auto& decimal_type = static_cast(*field->type()); + precision = decimal_type.precision(); + scale = decimal_type.scale(); + length = DecimalSize(precision); } break; case ArrowType::DATE32: type = ParquetType::INT32; @@ -565,12 +576,12 @@ Status FieldToNode(const std::shared_ptr& field, auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type()); return StructToNode(struct_type, field->name(), field->nullable(), properties, arrow_properties, out); - } break; + } case ArrowType::LIST: { auto list_type = std::static_pointer_cast<::arrow::ListType>(field->type()); return ListToNode(list_type, field->name(), field->nullable(), properties, arrow_properties, out); - } break; + } case ArrowType::DICTIONARY: { // Parquet has no Dictionary type, dictionary-encoded is handled on // the encoding, not the schema level. @@ -582,14 +593,15 @@ Status FieldToNode(const std::shared_ptr& field, return FieldToNode(unpacked_field, properties, arrow_properties, out); } default: { - // TODO: DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR + // TODO: DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL_TEXT, VARCHAR std::stringstream ss; ss << "Unhandled type for Arrow to Parquet schema conversion: "; ss << field->type()->ToString(); return Status::NotImplemented(ss.str()); } } - *out = PrimitiveNode::Make(field->name(), repetition, type, logical_type, length); + *out = PrimitiveNode::Make(field->name(), repetition, type, logical_type, length, + precision, scale); return Status::OK(); } @@ -617,5 +629,77 @@ Status ToParquetSchema(const ::arrow::Schema* arrow_schema, out); } +/// \brief Compute the number of bytes required to represent a decimal of a +/// given precision. Taken from the Apache Impala codebase. The comments next +/// to the return values are the maximum value that can be represented in 2's +/// complement with the returned number of bytes. +int32_t DecimalSize(int32_t precision) { + DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got " + << precision; + DCHECK_LE(precision, 38) << "decimal precision must be less than or equal to 38, got " + << precision; + + switch (precision) { + case 1: + case 2: + return 1; // 127 + case 3: + case 4: + return 2; // 32,767 + case 5: + case 6: + return 3; // 8,388,607 + case 7: + case 8: + case 9: + return 4; // 2,147,483,427 + case 10: + case 11: + return 5; // 549,755,813,887 + case 12: + case 13: + case 14: + return 6; // 140,737,488,355,327 + case 15: + case 16: + return 7; // 36,028,797,018,963,967 + case 17: + case 18: + return 8; // 9,223,372,036,854,775,807 + case 19: + case 20: + case 21: + return 9; // 2,361,183,241,434,822,606,847 + case 22: + case 23: + return 10; // 604,462,909,807,314,587,353,087 + case 24: + case 25: + case 26: + return 11; // 154,742,504,910,672,534,362,390,527 + case 27: + case 28: + return 12; // 39,614,081,257,132,168,796,771,975,167 + case 29: + case 30: + case 31: + return 13; // 10,141,204,801,825,835,211,973,625,643,007 + case 32: + case 33: + return 14; // 2,596,148,429,267,413,814,265,248,164,610,047 + case 34: + case 35: + return 15; // 664,613,997,892,457,936,451,903,530,140,172,287 + case 36: + case 37: + case 38: + return 16; // 170,141,183,460,469,231,731,687,303,715,884,105,727 + default: + DCHECK(false); + break; + } + return -1; +} + } // namespace arrow } // namespace parquet diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h index de153eb7..3b212da7 100644 --- a/src/parquet/arrow/schema.h +++ b/src/parquet/arrow/schema.h @@ -85,6 +85,8 @@ ::arrow::Status PARQUET_EXPORT ToParquetSchema(const ::arrow::Schema* arrow_sche const WriterProperties& properties, std::shared_ptr* out); +int32_t DecimalSize(int32_t precision); + } // namespace arrow } // namespace parquet diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index 954a84f3..df0c98a7 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -21,6 +21,7 @@ #include "arrow/api.h" #include "arrow/test-util.h" #include "arrow/type_traits.h" +#include "arrow/util/decimal.h" namespace parquet { namespace arrow { @@ -46,6 +47,9 @@ using is_arrow_binary = std::is_same; template using is_arrow_fixed_size_binary = std::is_same; +template +using is_arrow_decimal = std::is_same; + template using is_arrow_bool = std::is_same; @@ -114,6 +118,21 @@ NonNullArray(size_t size, std::shared_ptr* out) { return builder.Finish(out); } +template +typename std::enable_if::value, Status>::type NonNullArray( + size_t size, std::shared_ptr* out) { + using BuilderType = typename ::arrow::TypeTraits::BuilderType; + + // todo: find a way to generate test data with more diversity. + BuilderType builder(::arrow::decimal(24, 7)); + for (size_t i = 0; i < size; i++) { + // XXX: Decimal128 value(-45047LL, 18388229154599321957ULL) + ::arrow::Decimal128 value("-83095209205923957.2323995"); + RETURN_NOT_OK(builder.Append(value)); + } + return builder.Finish(out); +} + template typename std::enable_if::value, Status>::type NonNullArray( size_t size, std::shared_ptr* out) { @@ -246,6 +265,29 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, return builder.Finish(out); } +template +typename std::enable_if::value, Status>::type NullableArray( + size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<::arrow::Array>* out) { + std::vector valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + using BuilderType = typename ::arrow::TypeTraits::BuilderType; + BuilderType builder(::arrow::decimal(24, 7)); + + for (size_t i = 0; i < size; i++) { + if (!valid_bytes[i]) { + RETURN_NOT_OK(builder.AppendNull()); + } else { + ::arrow::Decimal128 value("-83095209205923957.2323995"); + RETURN_NOT_OK(builder.Append(value)); + } + } + return builder.Finish(out); +} + // This helper function only supports (size/2) nulls yet. template typename std::enable_if::value, Status>::type NullableArray( diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index b53c1cac..fa1c299a 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -32,6 +32,7 @@ using arrow::Array; using arrow::BinaryArray; using arrow::FixedSizeBinaryArray; +using arrow::DecimalArray; using arrow::BooleanArray; using arrow::Int16Array; using arrow::Int16Builder; @@ -104,7 +105,6 @@ class LevelBuilder { NOT_IMPLEMENTED_VISIT(Struct) NOT_IMPLEMENTED_VISIT(Union) - NOT_IMPLEMENTED_VISIT(Decimal) NOT_IMPLEMENTED_VISIT(Dictionary) NOT_IMPLEMENTED_VISIT(Interval) @@ -743,8 +743,6 @@ Status FileWriter::Impl::TypedWriteBatch( buffer_ptr[i] = ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + value_offset[i]); } - PARQUET_CATCH_NOT_OK( - writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); } else { int buffer_idx = 0; for (int64_t i = 0; i < data->length(); i++) { @@ -753,9 +751,9 @@ Status FileWriter::Impl::TypedWriteBatch( ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + value_offset[i]); } } - PARQUET_CATCH_NOT_OK( - writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); PARQUET_CATCH_NOT_OK(writer->Close()); return Status::OK(); } @@ -765,29 +763,86 @@ Status FileWriter::Impl::TypedWriteBatch ColumnWriter* column_writer, const std::shared_ptr& array, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels) { RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(FLBA), false)); - auto data = static_cast(array.get()); + const auto& data = static_cast(*array); + const int64_t length = data.length(); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); auto writer = reinterpret_cast*>(column_writer); - if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) { + if (writer->descr()->schema_node()->is_required() || data.null_count() == 0) { // no nulls, just dump the data // todo(advancedxy): use a writeBatch to avoid this step - for (int64_t i = 0; i < data->length(); i++) { - buffer_ptr[i] = FixedLenByteArray(data->GetValue(i)); + for (int64_t i = 0; i < length; i++) { + buffer_ptr[i] = FixedLenByteArray(data.GetValue(i)); } - PARQUET_CATCH_NOT_OK( - writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); } else { int buffer_idx = 0; - for (int64_t i = 0; i < data->length(); i++) { - if (!data->IsNull(i)) { - buffer_ptr[buffer_idx++] = FixedLenByteArray(data->GetValue(i)); + for (int64_t i = 0; i < length; i++) { + if (!data.IsNull(i)) { + buffer_ptr[buffer_idx++] = FixedLenByteArray(data.GetValue(i)); + } + } + } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); + PARQUET_CATCH_NOT_OK(writer->Close()); + return Status::OK(); +} + +template <> +Status FileWriter::Impl::TypedWriteBatch( + ColumnWriter* column_writer, const std::shared_ptr& array, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels) { + const auto& data = static_cast(*array); + + const int64_t length = data.length(); + + RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(FLBA), false)); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); + + auto writer = reinterpret_cast*>(column_writer); + + const auto& decimal_type = static_cast(*data.type()); + const int32_t precision = decimal_type.precision(); + const int32_t minimum_necessary_bytes = DecimalSize(precision); + const int32_t offset = decimal_type.byte_width() - minimum_necessary_bytes; + + if (writer->descr()->schema_node()->is_required() || data.null_count() == 0) { + // no nulls, just dump the data + // todo(advancedxy): use a writeBatch to avoid this step + for (int64_t i = 0; i < length; ++i) { + // bytes may be in little endian order, so swap + const uint8_t* raw_value = data.GetValue(i); + auto unsigned_64_bit = reinterpret_cast(raw_value); + const uint64_t value[] = {::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]), + ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0])}; + + // We only write the minimum number of bytes necessary to represent the value + // So start writing data from 16 - number of bytes necessary to represent the value + auto value_bytes = reinterpret_cast(value); + buffer_ptr[i] = FixedLenByteArray(value_bytes + offset); + } + } else { + int32_t buffer_idx = 0; + + for (int64_t i = 0; i < length; ++i) { + if (!data.IsNull(i)) { + const uint8_t* raw_value = data.GetValue(i); + auto unsigned_64_bit = reinterpret_cast(raw_value); + const uint64_t value[] = {::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]), + ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0])}; + + // We only write the minimum number of bytes necessary to represent the value + // So start writing data from 16 - number of bytes necessary to represent the + // value + auto value_bytes = reinterpret_cast(value); + buffer_ptr[buffer_idx++] = FixedLenByteArray(value_bytes + offset); } } - PARQUET_CATCH_NOT_OK( - writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); PARQUET_CATCH_NOT_OK(writer->Close()); return Status::OK(); } @@ -896,6 +951,7 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) { WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType) WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType) WRITE_BATCH_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType) + WRITE_BATCH_CASE(DECIMAL, DecimalType, FLBAType) WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type) WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type) WRITE_BATCH_CASE(TIME32, Time32Type, Int32Type) diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h index be387522..477083e1 100644 --- a/src/parquet/encoding-internal.h +++ b/src/parquet/encoding-internal.h @@ -420,11 +420,9 @@ inline void DictionaryDecoder::SetDict(Decoder* dictionary) PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, false)); uint8_t* bytes_data = byte_array_data_->mutable_data(); - int offset = 0; - for (int i = 0; i < num_dictionary_values; ++i) { + for (int32_t i = 0, offset = 0; i < num_dictionary_values; ++i, offset += fixed_len) { memcpy(bytes_data + offset, dictionary_[i].ptr, fixed_len); dictionary_[i].ptr = bytes_data + offset; - offset += fixed_len; } } diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc index 9b9bde9f..4ec48a45 100644 --- a/src/parquet/file/reader.cc +++ b/src/parquet/file/reader.cc @@ -45,9 +45,9 @@ RowGroupReader::RowGroupReader(std::unique_ptr contents) : contents_(std::move(contents)) {} std::shared_ptr RowGroupReader::Column(int i) { - DCHECK(i < metadata()->num_columns()) << "The RowGroup only has " - << metadata()->num_columns() - << "columns, requested column: " << i; + DCHECK(i < metadata()->num_columns()) + << "The RowGroup only has " << metadata()->num_columns() + << "columns, requested column: " << i; const ColumnDescriptor* descr = metadata()->schema()->Column(i); std::unique_ptr page_reader = contents_->GetColumnPageReader(i); @@ -57,9 +57,9 @@ std::shared_ptr RowGroupReader::Column(int i) { } std::unique_ptr RowGroupReader::GetColumnPageReader(int i) { - DCHECK(i < metadata()->num_columns()) << "The RowGroup only has " - << metadata()->num_columns() - << "columns, requested column: " << i; + DCHECK(i < metadata()->num_columns()) + << "The RowGroup only has " << metadata()->num_columns() + << "columns, requested column: " << i; return contents_->GetColumnPageReader(i); } @@ -127,9 +127,9 @@ std::shared_ptr ParquetFileReader::metadata() const { } std::shared_ptr ParquetFileReader::RowGroup(int i) { - DCHECK(i < metadata()->num_row_groups()) << "The file only has " - << metadata()->num_row_groups() - << "row groups, requested reader for: " << i; + DCHECK(i < metadata()->num_row_groups()) + << "The file only has " << metadata()->num_row_groups() + << "row groups, requested reader for: " << i; return contents_->GetRowGroup(i); } From 613255ec49ca11d8f7c3e541450b6e6cda14389b Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Wed, 25 Oct 2017 13:32:19 -0400 Subject: [PATCH 02/30] Do not use std::copy when reinterpret_cast will suffice --- src/parquet/arrow/reader.cc | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index 53273f71..0ec3035e 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -1051,13 +1051,16 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data)); uint8_t* out_ptr = data->mutable_data(); + using ::arrow::BitUtil::ToLittleEndian; + for (int64_t i = 0; i < length; ++i, out_ptr += type_length) { const ElementType value = values[i]; - const uint64_t raw[] = { - ::arrow::BitUtil::ToLittleEndian(static_cast(value)), - ::arrow::BitUtil::ToLittleEndian(static_cast(value < 0 ? -1 : 0))}; - const auto bytes = reinterpret_cast(raw); - std::copy(bytes, bytes + type_length, out_ptr); + + auto out_ptr_view = reinterpret_cast(out_ptr); + out_ptr_view[0] = ToLittleEndian(static_cast(value)); + + // no need to byteswap here because we're either all ones or all zeros + out_ptr_view[1] = static_cast(value < 0 ? -1 : 0); } if (reader->nullable_values()) { From 46dff1548b0a64be1ea4dadb53b888790547f914 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Tue, 31 Oct 2017 11:44:13 -0400 Subject: [PATCH 03/30] Clean up uint32 test --- src/parquet/arrow/arrow-reader-writer-test.cc | 38 ++++++++++++++----- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index b068ff7d..2108158d 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -885,25 +885,43 @@ TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) { ASSERT_OK_NO_THROW( WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512, properties)); - std::shared_ptr expected_values; std::shared_ptr int64_data = std::make_shared(::arrow::default_memory_pool()); { ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length())); - int64_t* int64_data_ptr = reinterpret_cast(int64_data->mutable_data()); - const uint32_t* uint32_data_ptr = - reinterpret_cast(values->values()->data()); - // std::copy might be faster but this is explicit on the casts) - for (int64_t i = 0; i < values->length(); i++) { - int64_data_ptr[i] = static_cast(uint32_data_ptr[i]); - } + auto int64_data_ptr = reinterpret_cast(int64_data->mutable_data()); + auto uint32_data_ptr = reinterpret_cast(values->values()->data()); + const auto cast_uint32_to_int64 = [](uint32_t value) { + return static_cast(value); + }; + std::transform(uint32_data_ptr, uint32_data_ptr + values->length(), int64_data_ptr, + cast_uint32_to_int64); } std::vector> buffers{values->null_bitmap(), int64_data}; auto arr_data = std::make_shared<::arrow::ArrayData>(::arrow::int64(), values->length(), buffers, values->null_count()); - ASSERT_OK(MakeArray(arr_data, &expected_values)); - this->ReadAndCheckSingleColumnTable(expected_values); + std::shared_ptr expected_values = MakeArray(arr_data); + ASSERT_NE(expected_values, NULLPTR); + + const auto& expected = static_cast(*expected_values); + ASSERT_GT(values->length(), 0); + ASSERT_EQ(values->length(), expected.length()); + + // TODO(phillipc): Is there a better way to compare these two arrays? + // AssertArraysEqual requires the same type, but we only care about values in this case + for (int i = 0; i < expected.length(); ++i) { + const bool value_is_valid = values->IsValid(i); + const bool expected_value_is_valid = expected.IsValid(i); + + ASSERT_EQ(value_is_valid, expected_value_is_valid); + + if (value_is_valid) { + uint32_t value = values->Value(i); + int64_t expected_value = expected.Value(i); + ASSERT_EQ(value, expected_value); + } + } } using TestStringParquetIO = TestParquetIO<::arrow::StringType>; From 028fb0349d6924ef0cae720aaf7a4386e0edf10e Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Fri, 3 Nov 2017 12:14:01 -0400 Subject: [PATCH 04/30] Remove garbage values --- src/parquet/arrow/arrow-reader-writer-test.cc | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 2108158d..632a16bd 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -300,17 +300,9 @@ struct test_traits<::arrow::FixedSizeBinaryType> { static std::string const value; }; -template <> -struct test_traits<::arrow::DecimalType> { - static constexpr ParquetType::type parquet_enum = ParquetType::FIXED_LEN_BYTE_ARRAY; - static ::arrow::Decimal128 const value; -}; - const std::string test_traits<::arrow::StringType>::value("Test"); // NOLINT const std::string test_traits<::arrow::BinaryType>::value("\x00\x01\x02\x03"); // NOLINT const std::string test_traits<::arrow::FixedSizeBinaryType>::value("Fixed"); // NOLINT -const ::arrow::Decimal128 test_traits<::arrow::DecimalType>::value( - "-83095209205923957.2323995"); // NOLINT template using ParquetDataType = DataType::parquet_enum>; From 3d243d54aaf4e80ed0b4f591275b9a14fdd9da94 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Fri, 3 Nov 2017 18:17:26 -0400 Subject: [PATCH 05/30] Checkpoint [ci skip] --- src/parquet/arrow/test-util.h | 71 +++++++++++++++++++++++++++++++---- 1 file changed, 64 insertions(+), 7 deletions(-) diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index df0c98a7..9cf1c520 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -118,16 +118,67 @@ NonNullArray(size_t size, std::shared_ptr* out) { return builder.Finish(out); } +static void DecimalRange(int32_t precision, ::arrow::Decimal128* min_decimal, + ::arrow::Decimal128* max_decimal) { + DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got " + << precision; + DCHECK_LE(precision, 38) << "decimal precision must be less than or equal to 38, got " + << precision; + DCHECK_NE(min_decimal, NULLPTR); + DCHECK_NE(max_decimal, NULLPTR); + + *max_decimal = 1; + for (int32_t i = 0; i < precision; ++i) { + *max_decimal *= 10; + } + *max_decimal -= 1; + *min_decimal = -(*max_decimal); +} + +class UniformDecimalDistribution { + public: + explicit UniformDecimalDistribution(int32_t precision) { + ::arrow::Decimal128 min_decimal; + ::arrow::Decimal128 max_decimal; + + DecimalRange(precision, &min_decimal, &max_decimal); + + const auto min_lower = static_cast(min_decimal.low_bits()); + const auto max_lower = static_cast(max_decimal.low_bits()); + + lower_bits_ = std::uniform_int_distribution(min_lower, max_lower); + upper_bits_ = std::uniform_int_distribution(min_decimal.high_bits(), + max_decimal.high_bits()); + } + + template + ::arrow::Decimal128 operator()(Generator& gen) { + return ::arrow::Decimal128(upper_bits_(gen), static_cast(lower_bits_(gen))); + } + + private: + std::uniform_int_distribution lower_bits_; + std::uniform_int_distribution upper_bits_; +}; + template typename std::enable_if::value, Status>::type NonNullArray( size_t size, std::shared_ptr* out) { - using BuilderType = typename ::arrow::TypeTraits::BuilderType; + constexpr int32_t kDecimalPrecision = 4; + constexpr int32_t kDecimalScale = 2; // todo: find a way to generate test data with more diversity. - BuilderType builder(::arrow::decimal(24, 7)); + const auto type = ::arrow::decimal(kDecimalPrecision, kDecimalScale); + + using BuilderType = typename ::arrow::TypeTraits::BuilderType; + BuilderType builder(type); + + constexpr int32_t seed = 0; + std::mt19937 gen(seed); + UniformDecimalDistribution decimal_dist(kDecimalPrecision); + for (size_t i = 0; i < size; i++) { - // XXX: Decimal128 value(-45047LL, 18388229154599321957ULL) - ::arrow::Decimal128 value("-83095209205923957.2323995"); + const ::arrow::Decimal128 value(decimal_dist(gen)); RETURN_NOT_OK(builder.Append(value)); } return builder.Finish(out); @@ -274,15 +325,21 @@ typename std::enable_if::value, Status>::type Nullab valid_bytes[i * 2] = 0; } + constexpr int32_t kDecimalPrecision = 24; + constexpr int32_t kDecimalScale = 7; + const auto type = ::arrow::decimal(kDecimalPrecision, kDecimalScale); + using BuilderType = typename ::arrow::TypeTraits::BuilderType; - BuilderType builder(::arrow::decimal(24, 7)); + BuilderType builder(type); + + std::mt19937 gen(seed); + UniformDecimalDistribution decimal_dist(kDecimalPrecision); for (size_t i = 0; i < size; i++) { if (!valid_bytes[i]) { RETURN_NOT_OK(builder.AppendNull()); } else { - ::arrow::Decimal128 value("-83095209205923957.2323995"); - RETURN_NOT_OK(builder.Append(value)); + RETURN_NOT_OK(builder.Append(decimal_dist(gen))); } } return builder.Finish(out); From 1782da05fdc024abd49d17318a8b1c6e790bdd2f Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Tue, 7 Nov 2017 16:34:48 -0500 Subject: [PATCH 06/30] Use arrow --- src/parquet/arrow/test-util.h | 51 +++-------------------------------- 1 file changed, 4 insertions(+), 47 deletions(-) diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index 9cf1c520..6119ca2f 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -118,54 +118,11 @@ NonNullArray(size_t size, std::shared_ptr* out) { return builder.Finish(out); } -static void DecimalRange(int32_t precision, ::arrow::Decimal128* min_decimal, - ::arrow::Decimal128* max_decimal) { - DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got " - << precision; - DCHECK_LE(precision, 38) << "decimal precision must be less than or equal to 38, got " - << precision; - DCHECK_NE(min_decimal, NULLPTR); - DCHECK_NE(max_decimal, NULLPTR); - - *max_decimal = 1; - for (int32_t i = 0; i < precision; ++i) { - *max_decimal *= 10; - } - *max_decimal -= 1; - *min_decimal = -(*max_decimal); -} - -class UniformDecimalDistribution { - public: - explicit UniformDecimalDistribution(int32_t precision) { - ::arrow::Decimal128 min_decimal; - ::arrow::Decimal128 max_decimal; - - DecimalRange(precision, &min_decimal, &max_decimal); - - const auto min_lower = static_cast(min_decimal.low_bits()); - const auto max_lower = static_cast(max_decimal.low_bits()); - - lower_bits_ = std::uniform_int_distribution(min_lower, max_lower); - upper_bits_ = std::uniform_int_distribution(min_decimal.high_bits(), - max_decimal.high_bits()); - } - - template - ::arrow::Decimal128 operator()(Generator& gen) { - return ::arrow::Decimal128(upper_bits_(gen), static_cast(lower_bits_(gen))); - } - - private: - std::uniform_int_distribution lower_bits_; - std::uniform_int_distribution upper_bits_; -}; - template typename std::enable_if::value, Status>::type NonNullArray( size_t size, std::shared_ptr* out) { - constexpr int32_t kDecimalPrecision = 4; - constexpr int32_t kDecimalScale = 2; + constexpr int32_t kDecimalPrecision = 24; + constexpr int32_t kDecimalScale = 7; // todo: find a way to generate test data with more diversity. const auto type = ::arrow::decimal(kDecimalPrecision, kDecimalScale); @@ -175,7 +132,7 @@ typename std::enable_if::value, Status>::type NonNul constexpr int32_t seed = 0; std::mt19937 gen(seed); - UniformDecimalDistribution decimal_dist(kDecimalPrecision); + ::arrow::test::UniformDecimalDistribution decimal_dist(kDecimalPrecision); for (size_t i = 0; i < size; i++) { const ::arrow::Decimal128 value(decimal_dist(gen)); @@ -333,7 +290,7 @@ typename std::enable_if::value, Status>::type Nullab BuilderType builder(type); std::mt19937 gen(seed); - UniformDecimalDistribution decimal_dist(kDecimalPrecision); + ::arrow::test::UniformDecimalDistribution decimal_dist(kDecimalPrecision); for (size_t i = 0; i < size; i++) { if (!valid_bytes[i]) { From 5c9292b42fa2c732d3524a801ab1663f6c38e439 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Thu, 9 Nov 2017 12:17:20 -0500 Subject: [PATCH 07/30] Proper dcheck call --- src/parquet/encoding-internal.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h index 477083e1..98b50489 100644 --- a/src/parquet/encoding-internal.h +++ b/src/parquet/encoding-internal.h @@ -595,7 +595,7 @@ inline int DictEncoder::Hash(const typename DType::c_type& value) const { template <> inline int DictEncoder::Hash(const ByteArray& value) const { if (value.len > 0) { - DCHECK(nullptr != value.ptr) << "Value ptr cannot be NULL"; + DCHECK_NE(nullptr, value.ptr) << "Value ptr cannot be NULL"; } return HashUtil::Hash(value.ptr, value.len, 0); } @@ -603,7 +603,7 @@ inline int DictEncoder::Hash(const ByteArray& value) const { template <> inline int DictEncoder::Hash(const FixedLenByteArray& value) const { if (type_length_ > 0) { - DCHECK(nullptr != value.ptr) << "Value ptr cannot be NULL"; + DCHECK_NE(nullptr, value.ptr) << "Value ptr cannot be NULL"; } return HashUtil::Hash(value.ptr, type_length_, 0); } From e162ca18968dbac3b0102eb785bbca2784ae3cdd Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Thu, 9 Nov 2017 15:32:11 -0500 Subject: [PATCH 08/30] Allocate scratch space to hold the byteswapped values --- src/parquet/arrow/writer.cc | 43 ++++++++++++++++--------------------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index fa1c299a..c82b6dc2 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -795,49 +795,42 @@ Status FileWriter::Impl::TypedWriteBatch( ColumnWriter* column_writer, const std::shared_ptr& array, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels) { const auto& data = static_cast(*array); - const int64_t length = data.length(); + std::vector big_endian_values(static_cast(length) * 2); + RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(FLBA), false)); auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); auto writer = reinterpret_cast*>(column_writer); const auto& decimal_type = static_cast(*data.type()); - const int32_t precision = decimal_type.precision(); - const int32_t minimum_necessary_bytes = DecimalSize(precision); - const int32_t offset = decimal_type.byte_width() - minimum_necessary_bytes; + const int32_t offset = + decimal_type.byte_width() - DecimalSize(decimal_type.precision()); if (writer->descr()->schema_node()->is_required() || data.null_count() == 0) { // no nulls, just dump the data // todo(advancedxy): use a writeBatch to avoid this step - for (int64_t i = 0; i < length; ++i) { - // bytes may be in little endian order, so swap - const uint8_t* raw_value = data.GetValue(i); - auto unsigned_64_bit = reinterpret_cast(raw_value); - const uint64_t value[] = {::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]), - ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0])}; - - // We only write the minimum number of bytes necessary to represent the value - // So start writing data from 16 - number of bytes necessary to represent the value - auto value_bytes = reinterpret_cast(value); - buffer_ptr[i] = FixedLenByteArray(value_bytes + offset); + for (int64_t i = 0, j = 0; i < length; ++i, j += 2) { + auto unsigned_64_bit = reinterpret_cast(data.GetValue(i)); + big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]); + big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]); + buffer_ptr[i] = FixedLenByteArray( + reinterpret_cast(&big_endian_values[j]) + offset); } } else { int32_t buffer_idx = 0; + int32_t j = 0; for (int64_t i = 0; i < length; ++i) { if (!data.IsNull(i)) { - const uint8_t* raw_value = data.GetValue(i); - auto unsigned_64_bit = reinterpret_cast(raw_value); - const uint64_t value[] = {::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]), - ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0])}; - - // We only write the minimum number of bytes necessary to represent the value - // So start writing data from 16 - number of bytes necessary to represent the - // value - auto value_bytes = reinterpret_cast(value); - buffer_ptr[buffer_idx++] = FixedLenByteArray(value_bytes + offset); + auto unsigned_64_bit = reinterpret_cast(data.GetValue(i)); + big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]); + big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]); + buffer_ptr[buffer_idx] = FixedLenByteArray( + reinterpret_cast(&big_endian_values[j]) + offset); + ++buffer_idx; + j += 2; } } } From 659fbc19fc0939d27199cf9efd3f4d01421082e6 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Thu, 9 Nov 2017 17:03:43 -0500 Subject: [PATCH 09/30] Fix deprecated API call --- src/parquet/arrow/reader.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index 0ec3035e..4546cca3 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -870,7 +870,7 @@ struct TransferFunctor< // Convert from BINARY type to STRING auto new_data = (*out)->data()->ShallowCopy(); new_data->type = type; - RETURN_NOT_OK(::arrow::MakeArray(new_data, out)); + *out = ::arrow::MakeArray(new_data); } return Status::OK(); } From 8808e4c23a088924fc7ee4b56875adb3d22b5d92 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Thu, 9 Nov 2017 17:06:47 -0500 Subject: [PATCH 10/30] Bump arrow version --- cmake_modules/ThirdpartyToolchain.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake index 601b4180..c6e2a8c0 100644 --- a/cmake_modules/ThirdpartyToolchain.cmake +++ b/cmake_modules/ThirdpartyToolchain.cmake @@ -366,7 +366,7 @@ if (NOT ARROW_FOUND) -DARROW_BUILD_TESTS=OFF) if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "") - set(ARROW_VERSION "b2596f66ac2a8aa66da81c1f75c36fa30c40d0df") + set(ARROW_VERSION "3188d70202795d8e0a8092ec5685d859b02e366d") else() set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}") endif() From 1eee6a98c7669e2f22ce2570232c340658e0a8b1 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Fri, 10 Nov 2017 18:03:02 -0500 Subject: [PATCH 11/30] Remove specific randint call --- src/parquet/arrow/arrow-reader-writer-test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 632a16bd..1e5b2445 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -1471,7 +1471,7 @@ void MakeListTable(int num_rows, std::shared_ptr
* out) { offset_values.push_back(total_elements); std::vector value_draws; - randint(total_elements, 0, 100, &value_draws); + randint(total_elements, 0, 100, &value_draws); std::vector is_valid; random_is_valid(total_elements, 0.1, &is_valid); From 9ff7eb4f1fd1698509424fb59421fabedfaebf25 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Fri, 10 Nov 2017 18:07:48 -0500 Subject: [PATCH 12/30] Remove specific template parameters --- src/parquet/arrow/test-util.h | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index 6119ca2f..e67cccfe 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -56,8 +56,10 @@ using is_arrow_bool = std::is_same; template typename std::enable_if::value, Status>::type NonNullArray( size_t size, std::shared_ptr* out) { - std::vector values; - ::arrow::test::random_real(size, 0, 0, 1, &values); + using c_type = typename ArrowType::c_type; + std::vector values; + ::arrow::test::random_real(size, 0, static_cast(0), static_cast(1), + &values); ::arrow::NumericBuilder builder; RETURN_NOT_OK(builder.Append(values.data(), values.size())); return builder.Finish(out); @@ -68,7 +70,7 @@ typename std::enable_if< is_arrow_int::value && !is_arrow_date::value, Status>::type NonNullArray(size_t size, std::shared_ptr* out) { std::vector values; - ::arrow::test::randint(size, 0, 64, &values); + ::arrow::test::randint(size, 0, 64, &values); // Passing data type so this will work with TimestampType too ::arrow::NumericBuilder builder(std::make_shared(), @@ -81,7 +83,7 @@ template typename std::enable_if::value, Status>::type NonNullArray( size_t size, std::shared_ptr* out) { std::vector values; - ::arrow::test::randint(size, 0, 64, &values); + ::arrow::test::randint(size, 0, 64, &values); for (size_t i = 0; i < size; i++) { values[i] *= 86400000; } @@ -145,7 +147,7 @@ template typename std::enable_if::value, Status>::type NonNullArray( size_t size, std::shared_ptr* out) { std::vector values; - ::arrow::test::randint(size, 0, 1, &values); + ::arrow::test::randint(size, 0, 1, &values); ::arrow::BooleanBuilder builder; RETURN_NOT_OK(builder.Append(values.data(), values.size())); return builder.Finish(out); @@ -155,9 +157,10 @@ typename std::enable_if::value, Status>::type NonNullAr template typename std::enable_if::value, Status>::type NullableArray( size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr* out) { - std::vector values; - ::arrow::test::random_real(size, seed, -1e10, 1e10, - &values); + using c_type = typename ArrowType::c_type; + std::vector values; + ::arrow::test::random_real(size, seed, static_cast(-1e10), + static_cast(1e10), &values); std::vector valid_bytes(size, 1); for (size_t i = 0; i < num_nulls; i++) { @@ -178,7 +181,7 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr(size, 0, 64, &values); + ::arrow::test::randint(size, 0, 64, &values); std::vector valid_bytes(size, 1); for (size_t i = 0; i < num_nulls; i++) { @@ -199,7 +202,7 @@ typename std::enable_if::value, Status>::type NullableA // Seed is random in Arrow right now (void)seed; - ::arrow::test::randint(size, 0, 64, &values); + ::arrow::test::randint(size, 0, 64, &values); for (size_t i = 0; i < size; i++) { values[i] *= 86400000; } @@ -311,7 +314,7 @@ typename std::enable_if::value, Status>::type NullableA // Seed is random in Arrow right now (void)seed; - ::arrow::test::randint(size, 0, 1, &values); + ::arrow::test::randint(size, 0, 1, &values); std::vector valid_bytes(size, 1); for (size_t i = 0; i < num_nulls; i++) { From 30655d6ae348c7d0fb2d09ea830fdf5e6f2555fd Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Sat, 11 Nov 2017 14:31:52 -0500 Subject: [PATCH 13/30] Use arrow random_decimals --- src/parquet/arrow/test-util.h | 78 ++++++++++++++++++----------------- 1 file changed, 41 insertions(+), 37 deletions(-) diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index e67cccfe..3d3d468b 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -29,6 +29,16 @@ namespace arrow { using ::arrow::Array; using ::arrow::Status; +template +struct DecimalWithPrecisionAndScale { + static_assert(PRECISION >= 1 && PRECISION <= 38, "Invalid precision value"); + + using type = ::arrow::DecimalType; + static constexpr ::arrow::Type::type type_id = ::arrow::DecimalType::type_id; + static constexpr int32_t precision = PRECISION; + static constexpr int32_t scale = PRECISION - 1; +}; + template using is_arrow_float = std::is_floating_point; @@ -47,9 +57,6 @@ using is_arrow_binary = std::is_same; template using is_arrow_fixed_size_binary = std::is_same; -template -using is_arrow_decimal = std::is_same; - template using is_arrow_bool = std::is_same; @@ -120,26 +127,25 @@ NonNullArray(size_t size, std::shared_ptr* out) { return builder.Finish(out); } -template -typename std::enable_if::value, Status>::type NonNullArray( - size_t size, std::shared_ptr* out) { - constexpr int32_t kDecimalPrecision = 24; - constexpr int32_t kDecimalScale = 7; +template +typename std::enable_if< + std::is_same>::value, Status>::type +NonNullArray(size_t size, std::shared_ptr* out) { + constexpr int32_t kDecimalPrecision = precision; + constexpr int32_t kDecimalScale = DecimalWithPrecisionAndScale::scale; - // todo: find a way to generate test data with more diversity. const auto type = ::arrow::decimal(kDecimalPrecision, kDecimalScale); - - using BuilderType = typename ::arrow::TypeTraits::BuilderType; - BuilderType builder(type); + ::arrow::DecimalBuilder builder(type); + const int32_t byte_width = static_cast(*type).byte_width(); constexpr int32_t seed = 0; - std::mt19937 gen(seed); - ::arrow::test::UniformDecimalDistribution decimal_dist(kDecimalPrecision); - for (size_t i = 0; i < size; i++) { - const ::arrow::Decimal128 value(decimal_dist(gen)); - RETURN_NOT_OK(builder.Append(value)); - } + std::shared_ptr out_buf; + RETURN_NOT_OK(::arrow::AllocateBuffer(::arrow::default_memory_pool(), size * byte_width, + &out_buf)); + ::arrow::test::random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data()); + + RETURN_NOT_OK(builder.Append(out_buf->data(), size)); return builder.Finish(out); } @@ -276,32 +282,30 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, return builder.Finish(out); } -template -typename std::enable_if::value, Status>::type NullableArray( - size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<::arrow::Array>* out) { - std::vector valid_bytes(size, 1); +template +typename std::enable_if< + std::is_same>::value, Status>::type +NullableArray(size_t size, size_t num_nulls, uint32_t seed, + std::shared_ptr<::arrow::Array>* out) { + std::vector valid_bytes(size, '\1'); - for (size_t i = 0; i < num_nulls; i++) { - valid_bytes[i * 2] = 0; + for (size_t i = 0; i < num_nulls; ++i) { + valid_bytes[i * 2] = '\0'; } - constexpr int32_t kDecimalPrecision = 24; - constexpr int32_t kDecimalScale = 7; + constexpr int32_t kDecimalPrecision = precision; + constexpr int32_t kDecimalScale = DecimalWithPrecisionAndScale::scale; const auto type = ::arrow::decimal(kDecimalPrecision, kDecimalScale); + const int32_t byte_width = static_cast(*type).byte_width(); - using BuilderType = typename ::arrow::TypeTraits::BuilderType; - BuilderType builder(type); + std::shared_ptr<::arrow::Buffer> out_buf; + RETURN_NOT_OK(::arrow::AllocateBuffer(::arrow::default_memory_pool(), size * byte_width, + &out_buf)); - std::mt19937 gen(seed); - ::arrow::test::UniformDecimalDistribution decimal_dist(kDecimalPrecision); + ::arrow::test::random_decimals(size, seed, precision, out_buf->mutable_data()); - for (size_t i = 0; i < size; i++) { - if (!valid_bytes[i]) { - RETURN_NOT_OK(builder.AppendNull()); - } else { - RETURN_NOT_OK(builder.Append(decimal_dist(gen))); - } - } + ::arrow::DecimalBuilder builder(type); + RETURN_NOT_OK(builder.Append(out_buf->data(), size, valid_bytes.data())); return builder.Finish(out); } From 7ab2e5cb01d3569e28729c5ffb92ea1a6d47fb8b Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Sat, 11 Nov 2017 14:32:04 -0500 Subject: [PATCH 14/30] Parameterize on precision --- src/parquet/arrow/arrow-reader-writer-test.cc | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 1e5b2445..0e43e4fa 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -551,7 +551,26 @@ typedef ::testing::Types< ::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType, - ::arrow::BinaryType, ::arrow::FixedSizeBinaryType, ::arrow::DecimalType> + ::arrow::BinaryType, ::arrow::FixedSizeBinaryType, DecimalWithPrecisionAndScale<1>, + DecimalWithPrecisionAndScale<2>, DecimalWithPrecisionAndScale<3>, + DecimalWithPrecisionAndScale<4>, DecimalWithPrecisionAndScale<5>, + DecimalWithPrecisionAndScale<6>, DecimalWithPrecisionAndScale<7>, + DecimalWithPrecisionAndScale<8>, DecimalWithPrecisionAndScale<9>, + DecimalWithPrecisionAndScale<10>, DecimalWithPrecisionAndScale<11>, + DecimalWithPrecisionAndScale<12>, DecimalWithPrecisionAndScale<13>, + DecimalWithPrecisionAndScale<14>, DecimalWithPrecisionAndScale<15>, + DecimalWithPrecisionAndScale<16>, DecimalWithPrecisionAndScale<17>, + DecimalWithPrecisionAndScale<19>, DecimalWithPrecisionAndScale<20>, + DecimalWithPrecisionAndScale<21>, DecimalWithPrecisionAndScale<22>, + DecimalWithPrecisionAndScale<23>, DecimalWithPrecisionAndScale<24>, + DecimalWithPrecisionAndScale<25>, DecimalWithPrecisionAndScale<26>, + DecimalWithPrecisionAndScale<27>, DecimalWithPrecisionAndScale<28>, + DecimalWithPrecisionAndScale<29>, DecimalWithPrecisionAndScale<30>, + DecimalWithPrecisionAndScale<31>, DecimalWithPrecisionAndScale<32>, + DecimalWithPrecisionAndScale<33>, DecimalWithPrecisionAndScale<34>, + // no 36 and 37 because gtest only allows us to have 50 template arguments to + // ::testing::Types + DecimalWithPrecisionAndScale<35>, DecimalWithPrecisionAndScale<38>> TestTypes; TYPED_TEST_CASE(TestParquetIO, TestTypes); From 6c9e2a7c407b49d9d69aa1084b36e4b8f43101b3 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Sat, 11 Nov 2017 15:18:18 -0500 Subject: [PATCH 15/30] Reduce the number of decimal test cases --- src/parquet/arrow/arrow-reader-writer-test.cc | 27 ++++++------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 0e43e4fa..98c1ea21 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -552,25 +552,14 @@ typedef ::testing::Types< ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType, ::arrow::BinaryType, ::arrow::FixedSizeBinaryType, DecimalWithPrecisionAndScale<1>, - DecimalWithPrecisionAndScale<2>, DecimalWithPrecisionAndScale<3>, - DecimalWithPrecisionAndScale<4>, DecimalWithPrecisionAndScale<5>, - DecimalWithPrecisionAndScale<6>, DecimalWithPrecisionAndScale<7>, - DecimalWithPrecisionAndScale<8>, DecimalWithPrecisionAndScale<9>, - DecimalWithPrecisionAndScale<10>, DecimalWithPrecisionAndScale<11>, - DecimalWithPrecisionAndScale<12>, DecimalWithPrecisionAndScale<13>, - DecimalWithPrecisionAndScale<14>, DecimalWithPrecisionAndScale<15>, - DecimalWithPrecisionAndScale<16>, DecimalWithPrecisionAndScale<17>, - DecimalWithPrecisionAndScale<19>, DecimalWithPrecisionAndScale<20>, - DecimalWithPrecisionAndScale<21>, DecimalWithPrecisionAndScale<22>, - DecimalWithPrecisionAndScale<23>, DecimalWithPrecisionAndScale<24>, - DecimalWithPrecisionAndScale<25>, DecimalWithPrecisionAndScale<26>, - DecimalWithPrecisionAndScale<27>, DecimalWithPrecisionAndScale<28>, - DecimalWithPrecisionAndScale<29>, DecimalWithPrecisionAndScale<30>, - DecimalWithPrecisionAndScale<31>, DecimalWithPrecisionAndScale<32>, - DecimalWithPrecisionAndScale<33>, DecimalWithPrecisionAndScale<34>, - // no 36 and 37 because gtest only allows us to have 50 template arguments to - // ::testing::Types - DecimalWithPrecisionAndScale<35>, DecimalWithPrecisionAndScale<38>> + DecimalWithPrecisionAndScale<3>, DecimalWithPrecisionAndScale<5>, + DecimalWithPrecisionAndScale<7>, DecimalWithPrecisionAndScale<10>, + DecimalWithPrecisionAndScale<12>, DecimalWithPrecisionAndScale<15>, + DecimalWithPrecisionAndScale<17>, DecimalWithPrecisionAndScale<19>, + DecimalWithPrecisionAndScale<22>, DecimalWithPrecisionAndScale<23>, + DecimalWithPrecisionAndScale<24>, DecimalWithPrecisionAndScale<27>, + DecimalWithPrecisionAndScale<29>, DecimalWithPrecisionAndScale<32>, + DecimalWithPrecisionAndScale<34>, DecimalWithPrecisionAndScale<38>> TestTypes; TYPED_TEST_CASE(TestParquetIO, TestTypes); From 64748a8ac341994551df45758877123c9909a4c5 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Sat, 11 Nov 2017 15:18:28 -0500 Subject: [PATCH 16/30] Copy from arrow for now --- src/parquet/arrow/test-util.h | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index 3d3d468b..ae4fb83a 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -127,6 +127,26 @@ NonNullArray(size_t size, std::shared_ptr* out) { return builder.Finish(out); } +static inline void random_decimals(int64_t n, uint32_t seed, int32_t precision, + uint8_t* out) { + std::mt19937 gen(seed); + std::uniform_int_distribution d(0, std::numeric_limits::max()); + const int32_t required_bytes = DecimalSize(precision); + constexpr int32_t byte_width = 16; + std::fill(out, out + byte_width * n, '\0'); + + for (int64_t i = 0; i < n; ++i, out += byte_width) { + std::generate(out, out + required_bytes, + [&d, &gen] { return static_cast(d(gen)); }); + + // sign extend if the sign bit is set for the last byte generated + // 0b10000000 == 0x80 == 128 + if ((out[required_bytes - 1] & '\x80') != 0) { + std::fill(out + required_bytes, out + byte_width, '\xFF'); + } + } +} + template typename std::enable_if< std::is_same>::value, Status>::type @@ -143,7 +163,7 @@ NonNullArray(size_t size, std::shared_ptr* out) { std::shared_ptr out_buf; RETURN_NOT_OK(::arrow::AllocateBuffer(::arrow::default_memory_pool(), size * byte_width, &out_buf)); - ::arrow::test::random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data()); + random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data()); RETURN_NOT_OK(builder.Append(out_buf->data(), size)); return builder.Finish(out); @@ -302,7 +322,7 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, RETURN_NOT_OK(::arrow::AllocateBuffer(::arrow::default_memory_pool(), size * byte_width, &out_buf)); - ::arrow::test::random_decimals(size, seed, precision, out_buf->mutable_data()); + random_decimals(size, seed, precision, out_buf->mutable_data()); ::arrow::DecimalBuilder builder(type); RETURN_NOT_OK(builder.Append(out_buf->data(), size, valid_bytes.data())); From b2e0290f76d8bf03da793441290968f71d4b7245 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Sat, 11 Nov 2017 16:02:58 -0500 Subject: [PATCH 17/30] IWYU --- src/parquet/arrow/test-util.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index ae4fb83a..c7894a1b 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include #include #include From 9f97c1d96c6067689cbd18fa634127fbc02f5769 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Sun, 12 Nov 2017 17:14:50 -0500 Subject: [PATCH 18/30] Update for ARROW-1794: rename DecimalArray to Decimal128Array --- src/parquet/arrow/reader.cc | 14 +++++++------- src/parquet/arrow/writer.cc | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index 4546cca3..5101efd8 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -963,10 +963,10 @@ static inline void RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_wid BytesToIntegerPair(value, byte_width, high, low); } -/// \brief Convert an array of FixedLenByteArrays to an arrow::DecimalArray +/// \brief Convert an array of FixedLenByteArrays to an arrow::Decimal128Array /// We do this by: /// 1. Creating a arrow::FixedSizeBinaryArray from the RecordReader's builder -/// 2. Allocating a buffer for the arrow::DecimalArray +/// 2. Allocating a buffer for the arrow::Decimal128Array /// 3. Converting the big-endian bytes in the FixedSizeBinaryArray to two integers /// representing the high and low bits of each decimal value. template <> @@ -1018,13 +1018,13 @@ struct TransferFunctor<::arrow::DecimalType, FLBAType> { } } - *out = std::make_shared<::arrow::DecimalArray>( + *out = std::make_shared<::arrow::Decimal128Array>( type, length, data, fixed_size_binary_array.null_bitmap(), null_count); return Status::OK(); } }; -/// \brief Convert an Int32 or Int64 array into a DecimalArray +/// \brief Convert an Int32 or Int64 array into a Decimal128Array /// The parquet spec allows systems to write decimals in int32, int64 if the values are /// small enough to fit in less 4 bytes or less than 8 bytes, respectively. /// This function implements the conversion from int32 and int64 arrays to decimal arrays. @@ -1065,10 +1065,10 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, if (reader->nullable_values()) { std::shared_ptr is_valid = reader->ReleaseIsValid(); - *out = std::make_shared<::arrow::DecimalArray>(type, length, data, is_valid, - reader->null_count()); + *out = std::make_shared<::arrow::Decimal128Array>(type, length, data, is_valid, + reader->null_count()); } else { - *out = std::make_shared<::arrow::DecimalArray>(type, length, data); + *out = std::make_shared<::arrow::Decimal128Array>(type, length, data); } return Status::OK(); } diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index c82b6dc2..e605d898 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -32,7 +32,7 @@ using arrow::Array; using arrow::BinaryArray; using arrow::FixedSizeBinaryArray; -using arrow::DecimalArray; +using arrow::Decimal128Array; using arrow::BooleanArray; using arrow::Int16Array; using arrow::Int16Builder; @@ -794,7 +794,7 @@ template <> Status FileWriter::Impl::TypedWriteBatch( ColumnWriter* column_writer, const std::shared_ptr& array, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels) { - const auto& data = static_cast(*array); + const auto& data = static_cast(*array); const int64_t length = data.length(); std::vector big_endian_values(static_cast(length) * 2); From 920832ac624e437911024d8b69f89cacd294c5aa Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 13 Nov 2017 19:18:58 -0500 Subject: [PATCH 19/30] Update arrow version --- cmake_modules/ThirdpartyToolchain.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake index c6e2a8c0..6f3ee627 100644 --- a/cmake_modules/ThirdpartyToolchain.cmake +++ b/cmake_modules/ThirdpartyToolchain.cmake @@ -366,7 +366,7 @@ if (NOT ARROW_FOUND) -DARROW_BUILD_TESTS=OFF) if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "") - set(ARROW_VERSION "3188d70202795d8e0a8092ec5685d859b02e366d") + set(ARROW_VERSION "e8331f46f8b324271e694557789ea53b082fdc05") else() set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}") endif() From 32a4abec39b2e3862cb40e947cf77dc6617a6008 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 13 Nov 2017 19:27:51 -0500 Subject: [PATCH 20/30] Cleanup iteration a bit --- src/parquet/arrow/writer.cc | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index e605d898..f7265315 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -797,6 +797,7 @@ Status FileWriter::Impl::TypedWriteBatch( const auto& data = static_cast(*array); const int64_t length = data.length(); + // TODO(phillipc): This is potentially very wasteful if we have a lot of nulls std::vector big_endian_values(static_cast(length) * 2); RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(FLBA), false)); @@ -808,7 +809,13 @@ Status FileWriter::Impl::TypedWriteBatch( const int32_t offset = decimal_type.byte_width() - DecimalSize(decimal_type.precision()); - if (writer->descr()->schema_node()->is_required() || data.null_count() == 0) { + const bool does_not_have_nulls = + writer->descr()->schema_node()->is_required() || data.null_count() == 0; + + // TODO(phillipc): Look into whether our compilers will perform loop unswitching so we + // don't have to keep writing two loops to handle the case where we know there are no + // nulls + if (does_not_have_nulls) { // no nulls, just dump the data // todo(advancedxy): use a writeBatch to avoid this step for (int64_t i = 0, j = 0; i < length; ++i, j += 2) { @@ -819,17 +826,13 @@ Status FileWriter::Impl::TypedWriteBatch( reinterpret_cast(&big_endian_values[j]) + offset); } } else { - int32_t buffer_idx = 0; - int32_t j = 0; - - for (int64_t i = 0; i < length; ++i) { + for (int64_t i = 0, buffer_idx = 0, j = 0; i < length; ++i) { if (!data.IsNull(i)) { auto unsigned_64_bit = reinterpret_cast(data.GetValue(i)); big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]); big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]); - buffer_ptr[buffer_idx] = FixedLenByteArray( + buffer_ptr[buffer_idx++] = FixedLenByteArray( reinterpret_cast(&big_endian_values[j]) + offset); - ++buffer_idx; j += 2; } } From c5c42945ae376b5fa7965dd617a78f366d044b53 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Tue, 14 Nov 2017 18:08:03 -0500 Subject: [PATCH 21/30] Fix issues --- src/parquet/arrow/reader.cc | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index 5101efd8..07bd1876 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -1040,7 +1040,9 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, const int64_t length = reader->values_written(); using ElementType = typename ParquetIntegerType::c_type; - static_assert(std::is_signed::value, "ElementType is not signed"); + static_assert(std::is_same::value || + std::is_same::value, + "ElementType must be int32_t or int64_t"); const auto values = reinterpret_cast(reader->values()); @@ -1051,15 +1053,18 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data)); uint8_t* out_ptr = data->mutable_data(); - using ::arrow::BitUtil::ToLittleEndian; + using ::arrow::BitUtil::FromLittleEndian; for (int64_t i = 0; i < length; ++i, out_ptr += type_length) { - const ElementType value = values[i]; + // sign/zero extend int32_t values, otherwise a no-op + const auto value = static_cast(values[i]); auto out_ptr_view = reinterpret_cast(out_ptr); - out_ptr_view[0] = ToLittleEndian(static_cast(value)); - // no need to byteswap here because we're either all ones or all zeros + // No-op on little endian machines, byteswap on big endian + out_ptr_view[0] = FromLittleEndian(static_cast(value)); + + // no need to byteswap here because we're sign/zero extending exactly 8 bytes out_ptr_view[1] = static_cast(value < 0 ? -1 : 0); } From 6036ca5c73bf040df487bbdde4a474d3cc4c97c4 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Tue, 14 Nov 2017 18:52:55 -0500 Subject: [PATCH 22/30] ARROW-1811 --- src/parquet/arrow/arrow-reader-writer-test.cc | 4 ++-- src/parquet/arrow/arrow-schema-test.cc | 2 +- src/parquet/arrow/reader.cc | 16 ++++++++-------- src/parquet/arrow/schema.cc | 13 +++++++------ src/parquet/arrow/writer.cc | 6 +++--- 5 files changed, 21 insertions(+), 20 deletions(-) diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 98c1ea21..8e5725e6 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -362,7 +362,7 @@ static std::shared_ptr MakeSimpleSchema(const ::arrow::DataType& type break; case ::arrow::Type::DECIMAL: { const auto& decimal_type = - static_cast(values_type); + static_cast(values_type); precision = decimal_type.precision(); scale = decimal_type.scale(); byte_width = DecimalSize(precision); @@ -375,7 +375,7 @@ static std::shared_ptr MakeSimpleSchema(const ::arrow::DataType& type byte_width = static_cast(type).byte_width(); break; case ::arrow::Type::DECIMAL: { - const auto& decimal_type = static_cast(type); + const auto& decimal_type = static_cast(type); precision = decimal_type.precision(); scale = decimal_type.scale(); byte_width = DecimalSize(precision); diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc index 07e72907..7ed9ad83 100644 --- a/src/parquet/arrow/arrow-schema-test.cc +++ b/src/parquet/arrow/arrow-schema-test.cc @@ -51,7 +51,7 @@ const auto TIMESTAMP_MS = ::arrow::timestamp(TimeUnit::MILLI); const auto TIMESTAMP_US = ::arrow::timestamp(TimeUnit::MICRO); const auto TIMESTAMP_NS = ::arrow::timestamp(TimeUnit::NANO); const auto BINARY = ::arrow::binary(); -const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4); +const auto DECIMAL_8_4 = std::make_shared<::arrow::Decimal128Type>(8, 4); class TestConvertParquetSchema : public ::testing::Test { public: diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index 07bd1876..f0190e60 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -970,7 +970,7 @@ static inline void RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_wid /// 3. Converting the big-endian bytes in the FixedSizeBinaryArray to two integers /// representing the high and low bits of each decimal value. template <> -struct TransferFunctor<::arrow::DecimalType, FLBAType> { +struct TransferFunctor<::arrow::Decimal128Type, FLBAType> { Status operator()(RecordReader* reader, MemoryPool* pool, const std::shared_ptr<::arrow::DataType>& type, std::shared_ptr* out) { @@ -991,7 +991,7 @@ struct TransferFunctor<::arrow::DecimalType, FLBAType> { // The byte width of each decimal value const int32_t type_length = - static_cast(*type).byte_width(); + static_cast(*type).byte_width(); // number of elements in the entire array const int64_t length = fixed_size_binary_array.length(); @@ -1046,7 +1046,7 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, const auto values = reinterpret_cast(reader->values()); - const auto& decimal_type = static_cast(*type); + const auto& decimal_type = static_cast(*type); const int64_t type_length = decimal_type.byte_width(); std::shared_ptr data; @@ -1079,7 +1079,7 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, } template <> -struct TransferFunctor<::arrow::DecimalType, Int32Type> { +struct TransferFunctor<::arrow::Decimal128Type, Int32Type> { Status operator()(RecordReader* reader, MemoryPool* pool, const std::shared_ptr<::arrow::DataType>& type, std::shared_ptr* out) { @@ -1088,7 +1088,7 @@ struct TransferFunctor<::arrow::DecimalType, Int32Type> { }; template <> -struct TransferFunctor<::arrow::DecimalType, Int64Type> { +struct TransferFunctor<::arrow::Decimal128Type, Int64Type> { Status operator()(RecordReader* reader, MemoryPool* pool, const std::shared_ptr<::arrow::DataType>& type, std::shared_ptr* out) { @@ -1157,13 +1157,13 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr* case ::arrow::Type::DECIMAL: { switch (descr_->physical_type()) { case ::parquet::Type::INT32: { - TRANSFER_DATA(::arrow::DecimalType, Int32Type); + TRANSFER_DATA(::arrow::Decimal128Type, Int32Type); } break; case ::parquet::Type::INT64: { - TRANSFER_DATA(::arrow::DecimalType, Int64Type); + TRANSFER_DATA(::arrow::Decimal128Type, Int64Type); } break; case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { - TRANSFER_DATA(::arrow::DecimalType, FLBAType); + TRANSFER_DATA(::arrow::Decimal128Type, FLBAType); } break; default: return Status::Invalid( diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index 7027cb3a..41da9fb3 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -50,7 +50,7 @@ const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI); const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO); const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO); -TypePtr MakeDecimalType(const PrimitiveNode& node) { +TypePtr MakeDecimal128Type(const PrimitiveNode& node) { const auto& metadata = node.decimal_metadata(); return ::arrow::decimal(metadata.precision, metadata.scale); } @@ -61,7 +61,7 @@ static Status FromByteArray(const PrimitiveNode& node, TypePtr* out) { *out = ::arrow::utf8(); break; case LogicalType::DECIMAL: - *out = MakeDecimalType(node); + *out = MakeDecimal128Type(node); break; default: // BINARY @@ -77,7 +77,7 @@ static Status FromFLBA(const PrimitiveNode& node, TypePtr* out) { *out = ::arrow::fixed_size_binary(node.type_length()); break; case LogicalType::DECIMAL: - *out = MakeDecimalType(node); + *out = MakeDecimal128Type(node); break; default: std::stringstream ss; @@ -120,7 +120,7 @@ static Status FromInt32(const PrimitiveNode& node, TypePtr* out) { *out = ::arrow::time32(::arrow::TimeUnit::MILLI); break; case LogicalType::DECIMAL: - *out = MakeDecimalType(node); + *out = MakeDecimal128Type(node); break; default: std::stringstream ss; @@ -144,7 +144,7 @@ static Status FromInt64(const PrimitiveNode& node, TypePtr* out) { *out = ::arrow::uint64(); break; case LogicalType::DECIMAL: - *out = MakeDecimalType(node); + *out = MakeDecimal128Type(node); break; case LogicalType::TIMESTAMP_MILLIS: *out = TIMESTAMP_MS; @@ -542,7 +542,8 @@ Status FieldToNode(const std::shared_ptr& field, case ArrowType::DECIMAL: { type = ParquetType::FIXED_LEN_BYTE_ARRAY; logical_type = LogicalType::DECIMAL; - const auto& decimal_type = static_cast(*field->type()); + const auto& decimal_type = + static_cast(*field->type()); precision = decimal_type.precision(); scale = decimal_type.scale(); length = DecimalSize(precision); diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index f7265315..1f3fc7e6 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -791,7 +791,7 @@ Status FileWriter::Impl::TypedWriteBatch } template <> -Status FileWriter::Impl::TypedWriteBatch( +Status FileWriter::Impl::TypedWriteBatch( ColumnWriter* column_writer, const std::shared_ptr& array, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels) { const auto& data = static_cast(*array); @@ -805,7 +805,7 @@ Status FileWriter::Impl::TypedWriteBatch( auto writer = reinterpret_cast*>(column_writer); - const auto& decimal_type = static_cast(*data.type()); + const auto& decimal_type = static_cast(*data.type()); const int32_t offset = decimal_type.byte_width() - DecimalSize(decimal_type.precision()); @@ -947,7 +947,7 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) { WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType) WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType) WRITE_BATCH_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType) - WRITE_BATCH_CASE(DECIMAL, DecimalType, FLBAType) + WRITE_BATCH_CASE(DECIMAL, Decimal128Type, FLBAType) WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type) WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type) WRITE_BATCH_CASE(TIME32, Time32Type, Int32Type) From 16935debe02bcd69dcf3da640dea89fa82d319da Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Tue, 14 Nov 2017 19:06:51 -0500 Subject: [PATCH 23/30] Reverse operand order and explicit cast --- src/parquet/arrow/arrow-reader-writer-test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 8e5725e6..4fad37d2 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -914,12 +914,12 @@ TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) { const bool value_is_valid = values->IsValid(i); const bool expected_value_is_valid = expected.IsValid(i); - ASSERT_EQ(value_is_valid, expected_value_is_valid); + ASSERT_EQ(expected_value_is_valid, value_is_valid); if (value_is_valid) { uint32_t value = values->Value(i); int64_t expected_value = expected.Value(i); - ASSERT_EQ(value, expected_value); + ASSERT_EQ(expected_value, static_cast(value)); } } } From da0a7eb73dcad3233b9c47b69eb826ac90a9a31b Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Wed, 15 Nov 2017 11:35:07 -0500 Subject: [PATCH 24/30] Update for ARROW-1811 --- cmake_modules/ThirdpartyToolchain.cmake | 2 +- src/parquet/arrow/arrow-reader-writer-test.cc | 2 +- src/parquet/arrow/test-util.h | 14 ++++++++------ 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake index 6f3ee627..b3d773de 100644 --- a/cmake_modules/ThirdpartyToolchain.cmake +++ b/cmake_modules/ThirdpartyToolchain.cmake @@ -366,7 +366,7 @@ if (NOT ARROW_FOUND) -DARROW_BUILD_TESTS=OFF) if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "") - set(ARROW_VERSION "e8331f46f8b324271e694557789ea53b082fdc05") + set(ARROW_VERSION "9fb806ce2ca0ccdee1b89c510dcfae16996cf243") else() set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}") endif() diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 4fad37d2..4b6bcbd2 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -1972,7 +1972,7 @@ TEST_P(TestArrowReaderAdHocSpark, ReadDecimals) { std::shared_ptr expected_array; - ::arrow::DecimalBuilder builder(decimal_type, pool); + ::arrow::Decimal128Builder builder(decimal_type, pool); for (int32_t i = 0; i < expected_length; ++i) { ::arrow::Decimal128 value((i + 1) * 100); diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index c7894a1b..8611a303 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -34,8 +34,8 @@ template struct DecimalWithPrecisionAndScale { static_assert(PRECISION >= 1 && PRECISION <= 38, "Invalid precision value"); - using type = ::arrow::DecimalType; - static constexpr ::arrow::Type::type type_id = ::arrow::DecimalType::type_id; + using type = ::arrow::Decimal128Type; + static constexpr ::arrow::Type::type type_id = ::arrow::Decimal128Type::type_id; static constexpr int32_t precision = PRECISION; static constexpr int32_t scale = PRECISION - 1; }; @@ -156,8 +156,9 @@ NonNullArray(size_t size, std::shared_ptr* out) { constexpr int32_t kDecimalScale = DecimalWithPrecisionAndScale::scale; const auto type = ::arrow::decimal(kDecimalPrecision, kDecimalScale); - ::arrow::DecimalBuilder builder(type); - const int32_t byte_width = static_cast(*type).byte_width(); + ::arrow::Decimal128Builder builder(type); + const int32_t byte_width = + static_cast(*type).byte_width(); constexpr int32_t seed = 0; @@ -317,7 +318,8 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, constexpr int32_t kDecimalPrecision = precision; constexpr int32_t kDecimalScale = DecimalWithPrecisionAndScale::scale; const auto type = ::arrow::decimal(kDecimalPrecision, kDecimalScale); - const int32_t byte_width = static_cast(*type).byte_width(); + const int32_t byte_width = + static_cast(*type).byte_width(); std::shared_ptr<::arrow::Buffer> out_buf; RETURN_NOT_OK(::arrow::AllocateBuffer(::arrow::default_memory_pool(), size * byte_width, @@ -325,7 +327,7 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, random_decimals(size, seed, precision, out_buf->mutable_data()); - ::arrow::DecimalBuilder builder(type); + ::arrow::Decimal128Builder builder(type); RETURN_NOT_OK(builder.Append(out_buf->data(), size, valid_bytes.data())); return builder.Finish(out); } From e25c59bdd1fdb763679c8f55ee1e14ff0526349a Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Sat, 18 Nov 2017 15:22:18 -0500 Subject: [PATCH 25/30] Fix reader writer test for unique kernel addition --- src/parquet/arrow/arrow-reader-writer-test.cc | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 4b6bcbd2..0e0831ec 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -24,6 +24,7 @@ #include "gtest/gtest.h" #include +#include #include "parquet/api/reader.h" #include "parquet/api/writer.h" @@ -44,7 +45,6 @@ using arrow::ArrayVisitor; using arrow::Buffer; using arrow::ChunkedArray; using arrow::Column; -using arrow::EncodeArrayToDictionary; using arrow::ListArray; using arrow::PoolBuffer; using arrow::PrimitiveArray; @@ -52,6 +52,9 @@ using arrow::Status; using arrow::Table; using arrow::TimeUnit; using arrow::default_memory_pool; +using arrow::compute::DictionaryEncode; +using arrow::compute::FunctionContext; +using arrow::compute::Datum; using arrow::io::BufferReader; using arrow::test::randint; @@ -619,8 +622,10 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalDictionaryWrite) { ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); - std::shared_ptr dict_values; - ASSERT_OK(EncodeArrayToDictionary(*values, default_memory_pool(), &dict_values)); + Datum out; + FunctionContext ctx(default_memory_pool()); + ASSERT_OK(DictionaryEncode(&ctx, Datum(values), &out)); + std::shared_ptr dict_values = MakeArray(out.array()); std::shared_ptr schema = MakeSimpleSchema(*dict_values->type(), Repetition::OPTIONAL); this->WriteColumn(schema, dict_values); From 51965cd5699585394f4c0c91d774c5f79a263dbd Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Sat, 18 Nov 2017 15:22:37 -0500 Subject: [PATCH 26/30] Min commit that contains the unique kernel in arrow --- cmake_modules/ThirdpartyToolchain.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake index b3d773de..fe1d4999 100644 --- a/cmake_modules/ThirdpartyToolchain.cmake +++ b/cmake_modules/ThirdpartyToolchain.cmake @@ -366,7 +366,7 @@ if (NOT ARROW_FOUND) -DARROW_BUILD_TESTS=OFF) if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "") - set(ARROW_VERSION "9fb806ce2ca0ccdee1b89c510dcfae16996cf243") + set(ARROW_VERSION "f2806fa518583907a129b2ecb0b7ec8758b69e17") else() set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}") endif() From 83948ec4943f479e39462400a491542bdc60fa7f Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Sat, 18 Nov 2017 15:24:38 -0500 Subject: [PATCH 27/30] Add last_value_ init --- src/parquet/encoding-internal.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h index 98b50489..3284aca6 100644 --- a/src/parquet/encoding-internal.h +++ b/src/parquet/encoding-internal.h @@ -921,7 +921,8 @@ class DeltaByteArrayDecoder : public Decoder { ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) : Decoder(descr, Encoding::DELTA_BYTE_ARRAY), prefix_len_decoder_(nullptr, pool), - suffix_decoder_(nullptr, pool) {} + suffix_decoder_(nullptr, pool), + last_value_(0, nullptr) {} virtual void SetData(int num_values, const uint8_t* data, int len) { num_values_ = num_values; From e4b02d33c731c42507701c41276d9a46229e8824 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Sat, 18 Nov 2017 15:33:18 -0500 Subject: [PATCH 28/30] Refactor types.h --- src/parquet/types.h | 91 +++++++++++++++++++-------------------------- 1 file changed, 38 insertions(+), 53 deletions(-) diff --git a/src/parquet/types.h b/src/parquet/types.h index af3a58f5..53b33d56 100644 --- a/src/parquet/types.h +++ b/src/parquet/types.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -136,15 +137,15 @@ struct ByteArray { ByteArray(uint32_t len, const uint8_t* ptr) : len(len), ptr(ptr) {} uint32_t len; const uint8_t* ptr; +}; - bool operator==(const ByteArray& other) const { - return this->len == other.len && 0 == memcmp(this->ptr, other.ptr, this->len); - } +inline bool operator==(const ByteArray& left, const ByteArray& right) { + return left.len == right.len && std::equal(left.ptr, left.ptr + left.len, right.ptr); +} - bool operator!=(const ByteArray& other) const { - return this->len != other.len || 0 != memcmp(this->ptr, other.ptr, this->len); - } -}; +inline bool operator!=(const ByteArray& left, const ByteArray& right) { + return !(left == right); +} struct FixedLenByteArray { FixedLenByteArray() : ptr(nullptr) {} @@ -152,63 +153,47 @@ struct FixedLenByteArray { const uint8_t* ptr; }; -typedef FixedLenByteArray FLBA; +using FLBA = FixedLenByteArray; -MANUALLY_ALIGNED_STRUCT(1) Int96 { - uint32_t value[3]; +MANUALLY_ALIGNED_STRUCT(1) Int96 { uint32_t value[3]; }; +STRUCT_END(Int96, 12); - bool operator==(const Int96& other) const { - return 0 == memcmp(this->value, other.value, 3 * sizeof(uint32_t)); - } +inline bool operator==(const Int96& left, const Int96& right) { + return std::equal(left.value, left.value + 3, right.value); +} - bool operator!=(const Int96& other) const { return !(*this == other); } -}; -STRUCT_END(Int96, 12); +inline bool operator!=(const Int96& left, const Int96& right) { return !(left == right); } static inline std::string ByteArrayToString(const ByteArray& a) { return std::string(reinterpret_cast(a.ptr), a.len); } static inline std::string Int96ToString(const Int96& a) { - std::stringstream result; - for (int i = 0; i < 3; i++) { - result << a.value[i] << " "; - } + std::ostringstream result; + std::copy(a.value, a.value + 3, std::ostream_iterator(result, " ")); return result.str(); } static inline std::string FixedLenByteArrayToString(const FixedLenByteArray& a, int len) { - const uint8_t* bytes = reinterpret_cast(a.ptr); - std::stringstream result; - for (int i = 0; i < len; i++) { - result << (uint32_t)bytes[i] << " "; - } + std::ostringstream result; + std::copy(a.ptr, a.ptr + len, std::ostream_iterator(result, " ")); return result.str(); } -static inline int ByteCompare(const ByteArray& x1, const ByteArray& x2) { - uint32_t len = std::min(x1.len, x2.len); - int cmp = memcmp(x1.ptr, x2.ptr, len); - if (cmp != 0) return cmp; - if (len < x1.len) return 1; - if (len < x2.len) return -1; - return 0; -} - -template +template struct type_traits {}; template <> struct type_traits { - typedef bool value_type; - static constexpr int value_byte_size = 1; + using value_type = bool; + static constexpr int value_byte_size = 1; static constexpr const char* printf_code = "d"; }; template <> struct type_traits { - typedef int32_t value_type; + using value_type = int32_t; static constexpr int value_byte_size = 4; static constexpr const char* printf_code = "d"; @@ -216,7 +201,7 @@ struct type_traits { template <> struct type_traits { - typedef int64_t value_type; + using value_type = int64_t; static constexpr int value_byte_size = 8; static constexpr const char* printf_code = "ld"; @@ -224,7 +209,7 @@ struct type_traits { template <> struct type_traits { - typedef Int96 value_type; + using value_type = Int96; static constexpr int value_byte_size = 12; static constexpr const char* printf_code = "s"; @@ -232,7 +217,7 @@ struct type_traits { template <> struct type_traits { - typedef float value_type; + using value_type = float; static constexpr int value_byte_size = 4; static constexpr const char* printf_code = "f"; @@ -240,7 +225,7 @@ struct type_traits { template <> struct type_traits { - typedef double value_type; + using value_type = double; static constexpr int value_byte_size = 8; static constexpr const char* printf_code = "lf"; @@ -248,7 +233,7 @@ struct type_traits { template <> struct type_traits { - typedef ByteArray value_type; + using value_type = ByteArray; static constexpr int value_byte_size = sizeof(ByteArray); static constexpr const char* printf_code = "s"; @@ -256,7 +241,7 @@ struct type_traits { template <> struct type_traits { - typedef FixedLenByteArray value_type; + using value_type = FixedLenByteArray; static constexpr int value_byte_size = sizeof(FixedLenByteArray); static constexpr const char* printf_code = "s"; @@ -264,18 +249,18 @@ struct type_traits { template struct DataType { + using c_type = typename type_traits::value_type; static constexpr Type::type type_num = TYPE; - typedef typename type_traits::value_type c_type; }; -typedef DataType BooleanType; -typedef DataType Int32Type; -typedef DataType Int64Type; -typedef DataType Int96Type; -typedef DataType FloatType; -typedef DataType DoubleType; -typedef DataType ByteArrayType; -typedef DataType FLBAType; +using BooleanType = DataType; +using Int32Type = DataType; +using Int64Type = DataType; +using Int96Type = DataType; +using FloatType = DataType; +using DoubleType = DataType; +using ByteArrayType = DataType; +using FLBAType = DataType; template inline std::string format_fwf(int width) { From 63018bcdf23e3018c9277547d837fee20c62e471 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 19 Nov 2017 13:40:54 -0500 Subject: [PATCH 29/30] Suppress C4996 due to arrow/util/variant.h --- CMakeLists.txt | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index 5aa8b930..c524ceb5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -460,6 +460,11 @@ if ("${COMPILER_FAMILY}" STREQUAL "clang") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CMAKE_CLANG_OPTIONS}") endif() +if ("${COMPILER_FAMILY}" STREQUAL "msvc") + # MSVC version of -Wno-deprecated + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4996") +endif() + ############################################################ # "make lint" target ############################################################ From 8c3d2229ed0b16c8eab2ecec6c68ae241c898f69 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Sun, 19 Nov 2017 17:29:15 -0500 Subject: [PATCH 30/30] Remove loop from BytesToInteger --- src/parquet/arrow/reader.cc | 56 ++++++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index f0190e60..3ca49cb4 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -877,38 +877,54 @@ struct TransferFunctor< }; static uint64_t BytesToInteger(const uint8_t* bytes, int32_t start, int32_t stop) { + using ::arrow::BitUtil::FromBigEndian; + const int32_t length = stop - start; DCHECK_GE(length, 0); DCHECK_LE(length, 8); switch (length) { - // We can forego the loop if the number of bytes to convert is a power of two case 0: return 0; + case 1: + return bytes[start]; case 2: - return ::arrow::BitUtil::FromBigEndian( - *reinterpret_cast(bytes + start)); + return FromBigEndian(*reinterpret_cast(bytes + start)); + case 3: { + const uint64_t first_two_bytes = + FromBigEndian(*reinterpret_cast(bytes + start)); + const uint64_t last_byte = bytes[stop - 1]; + return first_two_bytes << 8 | last_byte; + } case 4: - return ::arrow::BitUtil::FromBigEndian( - *reinterpret_cast(bytes + start)); + return FromBigEndian(*reinterpret_cast(bytes + start)); + case 5: { + const uint64_t first_four_bytes = + FromBigEndian(*reinterpret_cast(bytes + start)); + const uint64_t last_byte = bytes[stop - 1]; + return first_four_bytes << 8 | last_byte; + } + case 6: { + const uint64_t first_four_bytes = + FromBigEndian(*reinterpret_cast(bytes + start)); + const uint64_t last_two_bytes = + FromBigEndian(*reinterpret_cast(bytes + start + 4)); + return first_four_bytes << 16 | last_two_bytes; + } + case 7: { + const uint64_t first_four_bytes = + FromBigEndian(*reinterpret_cast(bytes + start)); + const uint64_t second_two_bytes = + FromBigEndian(*reinterpret_cast(bytes + start + 4)); + const uint64_t last_byte = bytes[stop - 1]; + return first_four_bytes << 24 | second_two_bytes << 8 | last_byte; + } case 8: - return ::arrow::BitUtil::FromBigEndian( - *reinterpret_cast(bytes + start)); + return FromBigEndian(*reinterpret_cast(bytes + start)); default: { - // Take a slower path for non power-of-2 number of bytes - uint64_t value = 0; - - const auto unsigned_stop = static_cast(stop); - - for (int32_t i = start; i < stop; ++i) { - const uint64_t bits_to_shift = (unsigned_stop - i - 1) * CHAR_BIT; - const uint64_t byte_value = bytes[i]; - const uint64_t shifted_value = byte_value << bits_to_shift; - value |= shifted_value; - } - - return value; + DCHECK(false); + return UINT64_MAX; } } }