diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 9c74abab530..1b3e0badfb1 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -795,9 +795,10 @@ class ParquetIOTestBase : public ::testing::Test { class TestReadDecimals : public ParquetIOTestBase { public: - void CheckReadFromByteArrays(const std::shared_ptr& logical_type, - const std::vector>& values, - const Array& expected) { + void CheckReadFromByteArrays( + const std::shared_ptr& logical_type, + const std::vector>& values, const Array& expected, + ArrowReaderProperties properties = default_arrow_reader_properties()) { std::vector byte_arrays(values.size()); std::transform(values.begin(), values.end(), byte_arrays.begin(), [](const std::vector& bytes) { @@ -822,7 +823,6 @@ class TestReadDecimals : public ParquetIOTestBase { // The binary_type setting shouldn't affect the results for (auto binary_type : {::arrow::Type::BINARY, ::arrow::Type::LARGE_BINARY, ::arrow::Type::BINARY_VIEW}) { - ArrowReaderProperties properties; properties.set_binary_type(binary_type); ASSERT_OK_AND_ASSIGN(auto reader, ReaderFromBuffer(buffer, properties)); ReadAndCheckSingleColumnFile(std::move(reader), expected); @@ -833,6 +833,44 @@ class TestReadDecimals : public ParquetIOTestBase { // The Decimal roundtrip tests always go through the FixedLenByteArray path, // check the ByteArray case manually. +TEST_F(TestReadDecimals, Decimal32ByteArray) { + const std::vector> big_endian_decimals = { + // 123456 + {1, 226, 64}, + // 987654 + {15, 18, 6}, + // -123456 + {255, 254, 29, 192}, + }; + + ArrowReaderProperties properties = default_arrow_reader_properties(); + properties.set_smallest_decimal_enabled(true); + + auto expected = + ArrayFromJSON(::arrow::decimal32(6, 3), R"(["123.456", "987.654", "-123.456"])"); + CheckReadFromByteArrays(LogicalType::Decimal(6, 3), big_endian_decimals, *expected, + properties); +} + +TEST_F(TestReadDecimals, Decimal64ByteArray) { + const std::vector> big_endian_decimals = { + // 123456 + {1, 226, 64}, + // 987654 + {15, 18, 6}, + // -123456 + {255, 255, 255, 255, 255, 254, 29, 192}, + }; + + ArrowReaderProperties properties = default_arrow_reader_properties(); + properties.set_smallest_decimal_enabled(true); + + auto expected = + ArrayFromJSON(::arrow::decimal64(16, 3), R"(["123.456", "987.654", "-123.456"])"); + CheckReadFromByteArrays(LogicalType::Decimal(16, 3), big_endian_decimals, *expected, + properties); +} + TEST_F(TestReadDecimals, Decimal128ByteArray) { const std::vector> big_endian_decimals = { // 123456 @@ -3044,18 +3082,19 @@ TEST(ArrowReadWrite, NestedRequiredField) { /*row_group_size=*/8); } -TEST(ArrowReadWrite, Decimal256) { - using ::arrow::Decimal256; +TEST(ArrowReadWrite, Decimal) { using ::arrow::field; - auto type = ::arrow::decimal256(8, 4); - const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678", "-9999.9999", "9999.9999"])"; - auto array = ::arrow::ArrayFromJSON(type, json); - auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array}); - auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build(); - CheckSimpleRoundtrip(table, 2, props_store_schema); + + for (auto type : {::arrow::decimal32(8, 4), ::arrow::decimal64(8, 4), + ::arrow::decimal128(8, 4), ::arrow::decimal256(8, 4)}) { + auto array = ::arrow::ArrayFromJSON(type, json); + auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array}); + auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build(); + CheckSimpleRoundtrip(table, 2, props_store_schema); + } } TEST(ArrowReadWrite, DecimalStats) { @@ -5468,6 +5507,64 @@ TYPED_TEST(TestIntegerAnnotateDecimalTypeParquetIO, SingleNullableDecimalColumn) ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values)); } +template +class TestIntegerAnnotateSmallestDecimalTypeParquetIO + : public TestIntegerAnnotateDecimalTypeParquetIO { + public: + void ReadAndCheckSingleDecimalColumnFile(const Array& values) { + ArrowReaderProperties properties = default_arrow_reader_properties(); + properties.set_smallest_decimal_enabled(true); + + std::shared_ptr out; + std::unique_ptr reader; + this->ReaderFromSink(&reader, properties); + this->ReadSingleColumnFile(std::move(reader), &out); + + if (values.type()->id() == out->type()->id()) { + AssertArraysEqual(values, *out); + } else { + auto decimal_type = checked_pointer_cast<::arrow::DecimalType>(values.type()); + + ASSERT_OK_AND_ASSIGN( + const auto expected_values, + ::arrow::compute::Cast(values, ::arrow::decimal256(decimal_type->precision(), + decimal_type->scale()))); + ASSERT_OK_AND_ASSIGN( + const auto out_values, + ::arrow::compute::Cast(*out, ::arrow::decimal256(decimal_type->precision(), + decimal_type->scale()))); + + ASSERT_EQ(expected_values->length(), out_values->length()); + ASSERT_EQ(expected_values->null_count(), out_values->null_count()); + ASSERT_TRUE(expected_values->Equals(*out_values)); + } + } +}; + +using SmallestDecimalTestTypes = ::testing::Types< + Decimal32WithPrecisionAndScale<9>, Decimal64WithPrecisionAndScale<9>, + Decimal64WithPrecisionAndScale<18>, Decimal128WithPrecisionAndScale<9>, + Decimal128WithPrecisionAndScale<18>, Decimal256WithPrecisionAndScale<9>, + Decimal256WithPrecisionAndScale<18>>; + +TYPED_TEST_SUITE(TestIntegerAnnotateSmallestDecimalTypeParquetIO, + SmallestDecimalTestTypes); + +TYPED_TEST(TestIntegerAnnotateSmallestDecimalTypeParquetIO, + SingleNonNullableDecimalColumn) { + std::shared_ptr values; + ASSERT_OK(NonNullArray(SMALL_SIZE, &values)); + ASSERT_NO_FATAL_FAILURE(this->WriteColumn(values)); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values)); +} + +TYPED_TEST(TestIntegerAnnotateSmallestDecimalTypeParquetIO, SingleNullableDecimalColumn) { + std::shared_ptr values; + ASSERT_OK(NullableArray(SMALL_SIZE, SMALL_SIZE / 2, kDefaultSeed, &values)); + ASSERT_NO_FATAL_FAILURE(this->WriteColumn(values)); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values)); +} + template class TestBufferedParquetIO : public TestParquetIO { public: diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 6bf1e4f1e4d..b622e93e072 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -69,6 +69,12 @@ using arrow::Decimal128Type; using arrow::Decimal256; using arrow::Decimal256Array; using arrow::Decimal256Type; +using arrow::Decimal32; +using arrow::Decimal32Array; +using arrow::Decimal32Type; +using arrow::Decimal64; +using arrow::Decimal64Array; +using arrow::Decimal64Type; using arrow::Field; using arrow::Int32Array; using arrow::ListArray; @@ -153,7 +159,8 @@ static Status FromInt32Statistics(const Int32Statistics& statistics, const LogicalType& logical_type, std::shared_ptr<::arrow::Scalar>* min, std::shared_ptr<::arrow::Scalar>* max) { - ARROW_ASSIGN_OR_RAISE(auto type, FromInt32(logical_type)); + ARROW_ASSIGN_OR_RAISE(auto type, + FromInt32(logical_type, default_arrow_reader_properties())); switch (logical_type.type()) { case LogicalType::Type::INT: @@ -175,7 +182,8 @@ static Status FromInt64Statistics(const Int64Statistics& statistics, const LogicalType& logical_type, std::shared_ptr<::arrow::Scalar>* min, std::shared_ptr<::arrow::Scalar>* max) { - ARROW_ASSIGN_OR_RAISE(auto type, FromInt64(logical_type)); + ARROW_ASSIGN_OR_RAISE(auto type, + FromInt64(logical_type, default_arrow_reader_properties())); switch (logical_type.type()) { case LogicalType::Type::INT: @@ -600,17 +608,10 @@ Status RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width, return ::arrow::Status::OK(); } -template -struct DecimalTypeTrait; - -template <> -struct DecimalTypeTrait<::arrow::Decimal128Array> { - using value = ::arrow::Decimal128; -}; - -template <> -struct DecimalTypeTrait<::arrow::Decimal256Array> { - using value = ::arrow::Decimal256; +template > +struct DecimalTypeTrait { + using value = typename ::arrow::TypeTraits::CType; }; template @@ -725,16 +726,17 @@ struct DecimalConverter { /// The parquet spec allows systems to write decimals in int32, int64 if the values are /// small enough to fit in less 4 bytes or less than 8 bytes, respectively. /// This function implements the conversion from int32 and int64 arrays to decimal arrays. -template < - typename DecimalArrayType, typename ParquetIntegerType, - typename = ::arrow::enable_if_t::value || - std::is_same::value>> +template ::value>> static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, const std::shared_ptr& field, Datum* out) { - // Decimal128 and Decimal256 are only Arrow constructs. Parquet does not - // specifically distinguish between decimal byte widths. - DCHECK(field->type()->id() == ::arrow::Type::DECIMAL128 || - field->type()->id() == ::arrow::Type::DECIMAL256); + using ArrayTypeClass = typename DecimalArrayType::TypeClass; + using DecimalValue = typename DecimalTypeTrait::value; + + // Decimal32, Decimal64, Decimal128 and Decimal256 are only Arrow constructs. Parquet + // does not specifically distinguish between decimal byte widths. + DCHECK(field->type()->id() == ArrayTypeClass::type_id); const int64_t length = reader->values_written(); @@ -745,25 +747,17 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, const auto values = reinterpret_cast(reader->values()); - const auto& decimal_type = checked_cast(*field->type()); + const auto& decimal_type = checked_cast(*field->type()); const int64_t type_length = decimal_type.byte_width(); ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool)); uint8_t* out_ptr = data->mutable_data(); - using ::arrow::bit_util::FromLittleEndian; - for (int64_t i = 0; i < length; ++i, out_ptr += type_length) { // sign/zero extend int32_t values, otherwise a no-op const auto value = static_cast(values[i]); - - if constexpr (std::is_same_v) { - ::arrow::Decimal128 decimal(value); - decimal.ToBytes(out_ptr); - } else { - ::arrow::Decimal256 decimal(value); - decimal.ToBytes(out_ptr); - } + DecimalValue decimal(value); + decimal.ToBytes(out_ptr); } if (reader->nullable_values() && field->nullable()) { @@ -802,6 +796,33 @@ Status TransferDecimal(RecordReader* reader, MemoryPool* pool, return Status::OK(); } +template +Status TransferDecimalTo(Type::type physical_type, Args&&... args) { + switch (physical_type) { + case ::parquet::Type::INT32: { + auto fn = DecimalIntegerTransfer; + RETURN_NOT_OK(fn(std::forward(args)...)); + } break; + case ::parquet::Type::INT64: { + auto fn = DecimalIntegerTransfer; + RETURN_NOT_OK(fn(std::forward(args)...)); + } break; + case ::parquet::Type::BYTE_ARRAY: { + auto fn = TransferDecimal; + RETURN_NOT_OK(fn(std::forward(args)...)); + } break; + case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { + auto fn = TransferDecimal; + RETURN_NOT_OK(fn(std::forward(args)...)); + } break; + default: + return Status::Invalid( + "Physical type for decimal must be int32, int64, byte array, or fixed length " + "binary"); + } + return Status::OK(); +} + Status TransferHalfFloat(RecordReader* reader, MemoryPool* pool, const std::shared_ptr& field, Datum* out) { static const auto binary_type = ::arrow::fixed_size_binary(2); @@ -902,55 +923,22 @@ Status TransferColumnData(RecordReader* reader, } RETURN_NOT_OK(TransferHalfFloat(reader, pool, value_field, &result)); } break; + case ::arrow::Type::DECIMAL32: { + RETURN_NOT_OK(TransferDecimalTo(descr->physical_type(), reader, + pool, value_field, &result)); + } break; + case ::arrow::Type::DECIMAL64: { + RETURN_NOT_OK(TransferDecimalTo(descr->physical_type(), reader, + pool, value_field, &result)); + } break; case ::arrow::Type::DECIMAL128: { - switch (descr->physical_type()) { - case ::parquet::Type::INT32: { - auto fn = DecimalIntegerTransfer; - RETURN_NOT_OK(fn(reader, pool, value_field, &result)); - } break; - case ::parquet::Type::INT64: { - auto fn = &DecimalIntegerTransfer; - RETURN_NOT_OK(fn(reader, pool, value_field, &result)); - } break; - case ::parquet::Type::BYTE_ARRAY: { - auto fn = &TransferDecimal; - RETURN_NOT_OK(fn(reader, pool, value_field, &result)); - } break; - case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { - auto fn = &TransferDecimal; - RETURN_NOT_OK(fn(reader, pool, value_field, &result)); - } break; - default: - return Status::Invalid( - "Physical type for decimal128 must be int32, int64, byte array, or fixed " - "length binary"); - } + RETURN_NOT_OK(TransferDecimalTo(descr->physical_type(), reader, + pool, value_field, &result)); + } break; + case ::arrow::Type::DECIMAL256: { + RETURN_NOT_OK(TransferDecimalTo(descr->physical_type(), reader, + pool, value_field, &result)); } break; - case ::arrow::Type::DECIMAL256: - switch (descr->physical_type()) { - case ::parquet::Type::INT32: { - auto fn = DecimalIntegerTransfer; - RETURN_NOT_OK(fn(reader, pool, value_field, &result)); - } break; - case ::parquet::Type::INT64: { - auto fn = &DecimalIntegerTransfer; - RETURN_NOT_OK(fn(reader, pool, value_field, &result)); - } break; - case ::parquet::Type::BYTE_ARRAY: { - auto fn = &TransferDecimal; - RETURN_NOT_OK(fn(reader, pool, value_field, &result)); - } break; - case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { - auto fn = &TransferDecimal; - RETURN_NOT_OK(fn(reader, pool, value_field, &result)); - } break; - default: - return Status::Invalid( - "Physical type for decimal256 must be int32, int64, byte array, or fixed " - "length binary"); - } - break; - case ::arrow::Type::TIMESTAMP: { const ::arrow::TimestampType& timestamp_type = checked_cast<::arrow::TimestampType&>(*value_field->type()); diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 2beb2c66efc..293ae94b94d 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -392,13 +392,15 @@ Status FieldToNode(const std::string& name, const std::shared_ptr& field, static_cast(*field->type()); length = fixed_size_binary_type.byte_width(); } break; + case ArrowTypeId::DECIMAL32: + case ArrowTypeId::DECIMAL64: case ArrowTypeId::DECIMAL128: case ArrowTypeId::DECIMAL256: { const auto& decimal_type = static_cast(*field->type()); precision = decimal_type.precision(); scale = decimal_type.scale(); if (properties.store_decimal_as_integer() && 1 <= precision && precision <= 18) { - type = precision <= 9 ? ParquetType ::INT32 : ParquetType ::INT64; + type = precision <= 9 ? ParquetType::INT32 : ParquetType::INT64; } else { type = ParquetType::FIXED_LEN_BYTE_ARRAY; length = DecimalType::DecimalSize(precision); @@ -1076,10 +1078,16 @@ Result ApplyOriginalStorageMetadata(const Field& origin_field, modified = true; } - if (origin_type->id() == ::arrow::Type::DECIMAL256 && - inferred_type->id() == ::arrow::Type::DECIMAL128) { - inferred->field = inferred->field->WithType(origin_type); - modified = true; + if (::arrow::is_decimal(origin_type->id()) && + ::arrow::is_decimal(inferred_type->id())) { + auto& origin_decimal = checked_cast(*origin_type); + auto& inferred_decimal = checked_cast(*inferred_type); + if (origin_decimal.precision() == inferred_decimal.precision() && + origin_decimal.scale() == inferred_decimal.scale() && + origin_decimal.id() != inferred_decimal.id()) { + inferred->field = inferred->field->WithType(origin_type); + modified = true; + } } // Restore field metadata diff --git a/cpp/src/parquet/arrow/schema_internal.cc b/cpp/src/parquet/arrow/schema_internal.cc index 72b8f0d992b..2e8cf764b27 100644 --- a/cpp/src/parquet/arrow/schema_internal.cc +++ b/cpp/src/parquet/arrow/schema_internal.cc @@ -39,8 +39,12 @@ using ::arrow::internal::checked_cast; namespace { -Result> MakeArrowDecimal(const LogicalType& logical_type) { +Result> MakeArrowDecimal(const LogicalType& logical_type, + bool smallest_decimal_enabled) { const auto& decimal = checked_cast(logical_type); + if (smallest_decimal_enabled) { + return ::arrow::smallest_decimal(decimal.precision(), decimal.scale()); + } if (decimal.precision() <= ::arrow::Decimal128Type::kMaxPrecision) { return ::arrow::Decimal128Type::Make(decimal.precision(), decimal.scale()); } @@ -154,7 +158,7 @@ Result> FromByteArray( case LogicalType::Type::STRING: return utf8_type(); case LogicalType::Type::DECIMAL: - return MakeArrowDecimal(logical_type); + return MakeArrowDecimal(logical_type, reader_properties.smallest_decimal_enabled()); case LogicalType::Type::NONE: case LogicalType::Type::ENUM: case LogicalType::Type::BSON: @@ -192,7 +196,7 @@ Result> FromFLBA( const ArrowReaderProperties& reader_properties) { switch (logical_type.type()) { case LogicalType::Type::DECIMAL: - return MakeArrowDecimal(logical_type); + return MakeArrowDecimal(logical_type, reader_properties.smallest_decimal_enabled()); case LogicalType::Type::FLOAT16: return ::arrow::float16(); case LogicalType::Type::NONE: @@ -212,7 +216,8 @@ Result> FromFLBA( } // namespace -::arrow::Result> FromInt32(const LogicalType& logical_type) { +::arrow::Result> FromInt32( + const LogicalType& logical_type, const ArrowReaderProperties& reader_properties) { switch (logical_type.type()) { case LogicalType::Type::INT: return MakeArrowInt(logical_type); @@ -221,7 +226,7 @@ ::arrow::Result> FromInt32(const LogicalType& logical case LogicalType::Type::TIME: return MakeArrowTime32(logical_type); case LogicalType::Type::DECIMAL: - return MakeArrowDecimal(logical_type); + return MakeArrowDecimal(logical_type, reader_properties.smallest_decimal_enabled()); case LogicalType::Type::NONE: return ::arrow::int32(); default: @@ -230,12 +235,13 @@ ::arrow::Result> FromInt32(const LogicalType& logical } } -Result> FromInt64(const LogicalType& logical_type) { +Result> FromInt64( + const LogicalType& logical_type, const ArrowReaderProperties& reader_properties) { switch (logical_type.type()) { case LogicalType::Type::INT: return MakeArrowInt64(logical_type); case LogicalType::Type::DECIMAL: - return MakeArrowDecimal(logical_type); + return MakeArrowDecimal(logical_type, reader_properties.smallest_decimal_enabled()); case LogicalType::Type::TIMESTAMP: return MakeArrowTimestamp(logical_type); case LogicalType::Type::TIME: @@ -265,9 +271,9 @@ Result> GetArrowType( case ParquetType::BOOLEAN: return ::arrow::boolean(); case ParquetType::INT32: - return FromInt32(logical_type); + return FromInt32(logical_type, reader_properties); case ParquetType::INT64: - return FromInt64(logical_type); + return FromInt64(logical_type, reader_properties); case ParquetType::INT96: return ::arrow::timestamp(reader_properties.coerce_int96_timestamp_unit()); case ParquetType::FLOAT: diff --git a/cpp/src/parquet/arrow/schema_internal.h b/cpp/src/parquet/arrow/schema_internal.h index 087b16755b0..09ad891aad3 100644 --- a/cpp/src/parquet/arrow/schema_internal.h +++ b/cpp/src/parquet/arrow/schema_internal.h @@ -25,10 +25,10 @@ namespace parquet::arrow { using ::arrow::Result; -Result> FromFLBA(const LogicalType& logical_type, - int32_t physical_length); -Result> FromInt32(const LogicalType& logical_type); -Result> FromInt64(const LogicalType& logical_type); +Result> FromInt32( + const LogicalType& logical_type, const ArrowReaderProperties& reader_properties); +Result> FromInt64( + const LogicalType& logical_type, const ArrowReaderProperties& reader_properties); Result> GetArrowType( Type::type physical_type, const LogicalType& logical_type, int type_length, diff --git a/cpp/src/parquet/arrow/test_util.h b/cpp/src/parquet/arrow/test_util.h index cfc57ce6ea7..05f6fd24ac0 100644 --- a/cpp/src/parquet/arrow/test_util.h +++ b/cpp/src/parquet/arrow/test_util.h @@ -47,25 +47,27 @@ using ::arrow::Array; using ::arrow::ChunkedArray; using ::arrow::Status; -template -struct Decimal128WithPrecisionAndScale { - static_assert(PRECISION >= 1 && PRECISION <= 38, "Invalid precision value"); - - using type = ::arrow::Decimal128Type; - static constexpr ::arrow::Type::type type_id = ::arrow::Decimal128Type::type_id; +template > +struct DecimalWithPrecisionAndScale { + using type = T; + static_assert(PRECISION >= T::kMinPrecision && PRECISION <= T::kMaxPrecision, + "Invalid precision value"); + static constexpr ::arrow::Type::type type_id = T::type_id; static constexpr int32_t precision = PRECISION; static constexpr int32_t scale = PRECISION - 1; }; - template -struct Decimal256WithPrecisionAndScale { - static_assert(PRECISION >= 1 && PRECISION <= 76, "Invalid precision value"); - - using type = ::arrow::Decimal256Type; - static constexpr ::arrow::Type::type type_id = ::arrow::Decimal256Type::type_id; - static constexpr int32_t precision = PRECISION; - static constexpr int32_t scale = PRECISION - 1; -}; +using Decimal32WithPrecisionAndScale = + DecimalWithPrecisionAndScale<::arrow::Decimal32Type, PRECISION>; +template +using Decimal64WithPrecisionAndScale = + DecimalWithPrecisionAndScale<::arrow::Decimal64Type, PRECISION>; +template +using Decimal128WithPrecisionAndScale = + DecimalWithPrecisionAndScale<::arrow::Decimal128Type, PRECISION>; +template +using Decimal256WithPrecisionAndScale = + DecimalWithPrecisionAndScale<::arrow::Decimal256Type, PRECISION>; template ::arrow::enable_if_floating_point NonNullArray( @@ -155,45 +157,25 @@ static void random_decimals(int64_t n, uint32_t seed, int32_t precision, uint8_t } template -::arrow::enable_if_t< - std::is_same>::value, Status> +::arrow::enable_if_t>, + Status> NonNullArray(size_t size, std::shared_ptr* out) { constexpr int32_t kDecimalPrecision = precision; - constexpr int32_t kDecimalScale = Decimal128WithPrecisionAndScale::scale; + constexpr int32_t kDecimalScale = ArrowType::scale; - const auto type = ::arrow::decimal128(kDecimalPrecision, kDecimalScale); - ::arrow::Decimal128Builder builder(type); - const int32_t byte_width = - static_cast(*type).byte_width(); + const auto type = + std::make_shared(kDecimalPrecision, kDecimalScale); + const int32_t byte_width = type->byte_width(); constexpr int32_t seed = 0; ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); - random_decimals<::arrow::Decimal128Type::kByteWidth>(size, seed, kDecimalPrecision, - out_buf->mutable_data()); - - RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size)); - return builder.Finish(out); -} - -template -::arrow::enable_if_t< - std::is_same>::value, Status> -NonNullArray(size_t size, std::shared_ptr* out) { - constexpr int32_t kDecimalPrecision = precision; - constexpr int32_t kDecimalScale = Decimal256WithPrecisionAndScale::scale; - - const auto type = ::arrow::decimal256(kDecimalPrecision, kDecimalScale); - ::arrow::Decimal256Builder builder(type); - const int32_t byte_width = - static_cast(*type).byte_width(); - - constexpr int32_t seed = 0; - - ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); - random_decimals<::arrow::Decimal256Type::kByteWidth>(size, seed, kDecimalPrecision, - out_buf->mutable_data()); + random_decimals(size, seed, kDecimalPrecision, + out_buf->mutable_data()); + using Builder = typename ::arrow::TypeTraits::BuilderType; + Builder builder(type); RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size)); return builder.Finish(out); } @@ -344,8 +326,9 @@ ::arrow::enable_if_fixed_size_binary NullableArray( } template -::arrow::enable_if_t< - std::is_same>::value, Status> +::arrow::enable_if_t>, + Status> NullableArray(size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<::arrow::Array>* out) { std::vector valid_bytes(size, '\1'); @@ -355,44 +338,18 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, } constexpr int32_t kDecimalPrecision = precision; - constexpr int32_t kDecimalScale = Decimal128WithPrecisionAndScale::scale; - const auto type = ::arrow::decimal128(kDecimalPrecision, kDecimalScale); - const int32_t byte_width = - static_cast(*type).byte_width(); - - ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); - - random_decimals<::arrow::Decimal128Type::kByteWidth>(size, seed, precision, - out_buf->mutable_data()); - - ::arrow::Decimal128Builder builder(type); - RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data())); - return builder.Finish(out); -} - -template -::arrow::enable_if_t< - std::is_same>::value, Status> -NullableArray(size_t size, size_t num_nulls, uint32_t seed, - std::shared_ptr<::arrow::Array>* out) { - std::vector valid_bytes(size, '\1'); - - for (size_t i = 0; i < num_nulls; ++i) { - valid_bytes[i * 2] = '\0'; - } + constexpr int32_t kDecimalScale = ArrowType::scale; - constexpr int32_t kDecimalPrecision = precision; - constexpr int32_t kDecimalScale = Decimal256WithPrecisionAndScale::scale; - const auto type = ::arrow::decimal256(kDecimalPrecision, kDecimalScale); - const int32_t byte_width = - static_cast(*type).byte_width(); + const auto type = + std::make_shared(kDecimalPrecision, kDecimalScale); + const int32_t byte_width = type->byte_width(); ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); + random_decimals(size, seed, precision, + out_buf->mutable_data()); - random_decimals<::arrow::Decimal256Type::kByteWidth>(size, seed, precision, - out_buf->mutable_data()); - - ::arrow::Decimal256Builder builder(type); + using Builder = typename ::arrow::TypeTraits::BuilderType; + Builder builder(type); RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data())); return builder.Finish(out); } diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 288a656d20e..f35f84f002b 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -2115,33 +2115,30 @@ struct SerializeFunctor< ArrowWriteContext* ctx, value_type* out) { if (array.null_count() == 0) { for (int64_t i = 0; i < array.length(); i++) { - out[i] = TransferValue(array.Value(i)); + out[i] = TransferValue(array.Value(i)); } } else { for (int64_t i = 0; i < array.length(); i++) { - out[i] = - array.IsValid(i) ? TransferValue(array.Value(i)) : 0; + out[i] = array.IsValid(i) ? TransferValue(array.Value(i)) : 0; } } return Status::OK(); } - template + private: value_type TransferValue(const uint8_t* in) const { - static_assert(byte_width == 16 || byte_width == 32, - "only 16 and 32 byte Decimals supported"); - value_type value = 0; - if constexpr (byte_width == 16) { - ::arrow::Decimal128 decimal_value(in); - PARQUET_THROW_NOT_OK(decimal_value.ToInteger(&value)); - } else { - ::arrow::Decimal256 decimal_value(in); + using DecimalValue = typename ::arrow::TypeTraits::CType; + DecimalValue decimal_value(in); + if constexpr (std::is_same_v) { // Decimal256 does not provide ToInteger, but we are sure it fits in the target // integer type. - value = static_cast(decimal_value.low_bits()); + return static_cast(decimal_value.low_bits()); + } else { + value_type value = 0; + PARQUET_THROW_NOT_OK(decimal_value.ToInteger(&value)); + return value; } - return value; } }; @@ -2179,6 +2176,8 @@ Status TypedColumnWriterImpl::WriteArrowDense( WRITE_ZERO_COPY_CASE(DATE32) WRITE_SERIALIZE_CASE(DATE64) WRITE_SERIALIZE_CASE(TIME32) + WRITE_SERIALIZE_CASE(DECIMAL32) + WRITE_SERIALIZE_CASE(DECIMAL64) WRITE_SERIALIZE_CASE(DECIMAL128) WRITE_SERIALIZE_CASE(DECIMAL256) default: @@ -2350,6 +2349,8 @@ Status TypedColumnWriterImpl::WriteArrowDense( WRITE_SERIALIZE_CASE(UINT64) WRITE_ZERO_COPY_CASE(TIME64) WRITE_ZERO_COPY_CASE(DURATION) + WRITE_SERIALIZE_CASE(DECIMAL32) + WRITE_SERIALIZE_CASE(DECIMAL64) WRITE_SERIALIZE_CASE(DECIMAL128) WRITE_SERIALIZE_CASE(DECIMAL256) default: @@ -2491,12 +2492,11 @@ struct SerializeFunctor< if (array.null_count() == 0) { for (int64_t i = 0; i < array.length(); i++) { - out[i] = FixDecimalEndianness(array.GetValue(i), offset); + out[i] = FixDecimalEndianness(array.GetValue(i), offset); } } else { for (int64_t i = 0; i < array.length(); i++) { - out[i] = array.IsValid(i) ? FixDecimalEndianness( - array.GetValue(i), offset) + out[i] = array.IsValid(i) ? FixDecimalEndianness(array.GetValue(i), offset) : FixedLenByteArray(); } } @@ -2504,8 +2504,9 @@ struct SerializeFunctor< return Status::OK(); } + private: // Parquet's Decimal are stored with FixedLength values where the length is - // proportional to the precision. Arrow's Decimal are always stored with 16/32 + // proportional to the precision. Arrow's Decimal are always stored with 4/8/16/32 // bytes. Thus the internal FLBA pointer must be adjusted by the offset calculated // here. int32_t Offset(const Array& array) { @@ -2519,29 +2520,37 @@ struct SerializeFunctor< int64_t non_null_count = array.length() - array.null_count(); int64_t size = non_null_count * ArrowType::kByteWidth; scratch_buffer = AllocateBuffer(ctx->memory_pool, size); - scratch = reinterpret_cast(scratch_buffer->mutable_data()); + scratch = scratch_buffer->mutable_data(); } - template FixedLenByteArray FixDecimalEndianness(const uint8_t* in, int64_t offset) { - const auto* u64_in = reinterpret_cast(in); auto out = reinterpret_cast(scratch) + offset; - static_assert(byte_width == 16 || byte_width == 32, - "only 16 and 32 byte Decimals supported"); - if (byte_width == 32) { - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[3]); - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[2]); - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[1]); - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); + if constexpr (std::is_same_v) { + const auto* u32_in = reinterpret_cast(in); + auto p = reinterpret_cast(scratch); + *p++ = ::arrow::bit_util::ToBigEndian(u32_in[0]); + scratch = reinterpret_cast(p); } else { - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[1]); - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); + const auto* u64_in = reinterpret_cast(in); + auto p = reinterpret_cast(scratch); + if constexpr (std::is_same_v) { + *p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); + } else if constexpr (std::is_same_v) { + *p++ = ::arrow::bit_util::ToBigEndian(u64_in[1]); + *p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); + } else if constexpr (std::is_same_v) { + *p++ = ::arrow::bit_util::ToBigEndian(u64_in[3]); + *p++ = ::arrow::bit_util::ToBigEndian(u64_in[2]); + *p++ = ::arrow::bit_util::ToBigEndian(u64_in[1]); + *p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); + } + scratch = reinterpret_cast(p); } return FixedLenByteArray(out); } std::shared_ptr scratch_buffer; - int64_t* scratch; + uint8_t* scratch; }; // ---------------------------------------------------------------------- @@ -2577,6 +2586,8 @@ Status TypedColumnWriterImpl::WriteArrowDense( const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { switch (array.type()->id()) { WRITE_SERIALIZE_CASE(FIXED_SIZE_BINARY) + WRITE_SERIALIZE_CASE(DECIMAL32) + WRITE_SERIALIZE_CASE(DECIMAL64) WRITE_SERIALIZE_CASE(DECIMAL128) WRITE_SERIALIZE_CASE(DECIMAL256) WRITE_SERIALIZE_CASE(HALF_FLOAT) diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 27bf672f86e..5a1799c39d7 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -1006,7 +1006,8 @@ class PARQUET_EXPORT ArrowReaderProperties { binary_type_(kArrowDefaultBinaryType), list_type_(kArrowDefaultListType), arrow_extensions_enabled_(false), - should_load_statistics_(false) {} + should_load_statistics_(false), + smallest_decimal_enabled_(false) {} /// \brief Set whether to use the IO thread pool to parse columns in parallel. /// @@ -1126,6 +1127,24 @@ class PARQUET_EXPORT ArrowReaderProperties { /// Return whether loading statistics as much as possible. bool should_load_statistics() const { return should_load_statistics_; } + /// \brief Set whether to infer Decimal32/64 from Parquet decimal logical types. + /// + /// Default is false for compatibility, meaning that only Decimal128 and Decimal256 + /// can be inferred. + void set_smallest_decimal_enabled(bool smallest_decimal_enable) { + smallest_decimal_enabled_ = smallest_decimal_enable; + } + /// \brief Whether to infer Decimal32/64 from Parquet decimal logical types. + /// + /// When enabled, Parquet decimal columns will be inferred as the smallest possible + /// Arrow Decimal type. + /// When disabled, Parquet decimal columns will be inferred as either Decimal128 or + /// Decimal256, but not Decimal32/64. + /// + /// Note: if an Arrow schema is found in the Parquet metadata, it will take priority and + /// this setting will be ignored. + bool smallest_decimal_enabled() const { return smallest_decimal_enabled_; } + private: bool use_threads_; std::unordered_set read_dict_indices_; @@ -1138,6 +1157,7 @@ class PARQUET_EXPORT ArrowReaderProperties { ::arrow::Type::type list_type_; bool arrow_extensions_enabled_; bool should_load_statistics_; + bool smallest_decimal_enabled_; }; /// EXPERIMENTAL: Constructs the default ArrowReaderProperties diff --git a/docs/source/cpp/parquet.rst b/docs/source/cpp/parquet.rst index 880e84939b4..cc118f00057 100644 --- a/docs/source/cpp/parquet.rst +++ b/docs/source/cpp/parquet.rst @@ -460,8 +460,8 @@ physical type. +-------------------+-----------------------------+------------------------------+-----------+ | INT | INT64 | Int64 / UInt64 | | +-------------------+-----------------------------+------------------------------+-----------+ -| DECIMAL | INT32 / INT64 / BYTE_ARRAY | Decimal128 / Decimal256 | \(2) | -| | / FIXED_LENGTH_BYTE_ARRAY | | | +| DECIMAL | INT32 / INT64 / BYTE_ARRAY | Decimal32/ Decimal64 / | \(2) | +| | / FIXED_LENGTH_BYTE_ARRAY | Decimal128 / Decimal256 | | +-------------------+-----------------------------+------------------------------+-----------+ | DATE | INT32 | Date32 | \(3) | +-------------------+-----------------------------+------------------------------+-----------+ @@ -493,7 +493,8 @@ physical type. * \(1) On the write side, the Parquet physical type INT32 is generated. -* \(2) On the write side, a FIXED_LENGTH_BYTE_ARRAY is always emitted. +* \(2) On the write side, a FIXED_LENGTH_BYTE_ARRAY is always emitted + except if ``store_decimal_as_integer`` is set to true. * \(3) On the write side, an Arrow Date64 is also mapped to a Parquet DATE INT32.