From b51c5175fdbbef61f8b71b7abd5ddb755dfb54dc Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Tue, 26 Aug 2025 18:57:28 +0800 Subject: [PATCH 01/16] arrow Decimal32/64 read/write parquet --- .../parquet/arrow/arrow_reader_writer_test.cc | 156 +++++++++++++++++- cpp/src/parquet/arrow/reader_internal.cc | 140 +++++++--------- cpp/src/parquet/arrow/schema.cc | 31 +++- cpp/src/parquet/arrow/schema_internal.cc | 24 ++- cpp/src/parquet/arrow/schema_internal.h | 11 +- cpp/src/parquet/arrow/test_util.h | 121 +++++--------- cpp/src/parquet/column_writer.cc | 65 ++++---- cpp/src/parquet/properties.h | 13 +- 8 files changed, 347 insertions(+), 214 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 9c74abab530..b27a13df90e 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -795,9 +795,10 @@ class ParquetIOTestBase : public ::testing::Test { class TestReadDecimals : public ParquetIOTestBase { public: - void CheckReadFromByteArrays(const std::shared_ptr& logical_type, - const std::vector>& values, - const Array& expected) { + void CheckReadFromByteArrays( + const std::shared_ptr& logical_type, + const std::vector>& values, const Array& expected, + ArrowReaderProperties properties = default_arrow_reader_properties()) { std::vector byte_arrays(values.size()); std::transform(values.begin(), values.end(), byte_arrays.begin(), [](const std::vector& bytes) { @@ -822,7 +823,6 @@ class TestReadDecimals : public ParquetIOTestBase { // The binary_type setting shouldn't affect the results for (auto binary_type : {::arrow::Type::BINARY, ::arrow::Type::LARGE_BINARY, ::arrow::Type::BINARY_VIEW}) { - ArrowReaderProperties properties; properties.set_binary_type(binary_type); ASSERT_OK_AND_ASSIGN(auto reader, ReaderFromBuffer(buffer, properties)); ReadAndCheckSingleColumnFile(std::move(reader), expected); @@ -833,6 +833,44 @@ class TestReadDecimals : public ParquetIOTestBase { // The Decimal roundtrip tests always go through the FixedLenByteArray path, // check the ByteArray case manually. +TEST_F(TestReadDecimals, Decimal32ByteArray) { + const std::vector> big_endian_decimals = { + // 123456 + {1, 226, 64}, + // 987654 + {15, 18, 6}, + // -123456 + {255, 254, 29, 192}, + }; + + ArrowReaderProperties properties = default_arrow_reader_properties(); + properties.set_smallest_decimal_enabled(true); + + auto expected = + ArrayFromJSON(::arrow::decimal32(6, 3), R"(["123.456", "987.654", "-123.456"])"); + CheckReadFromByteArrays(LogicalType::Decimal(6, 3), big_endian_decimals, *expected, + properties); +} + +TEST_F(TestReadDecimals, Decimal64ByteArray) { + const std::vector> big_endian_decimals = { + // 123456 + {1, 226, 64}, + // 987654 + {15, 18, 6}, + // -123456 + {255, 255, 255, 255, 255, 254, 29, 192}, + }; + + ArrowReaderProperties properties = default_arrow_reader_properties(); + properties.set_smallest_decimal_enabled(true); + + auto expected = + ArrayFromJSON(::arrow::decimal64(16, 3), R"(["123.456", "987.654", "-123.456"])"); + CheckReadFromByteArrays(LogicalType::Decimal(16, 3), big_endian_decimals, *expected, + properties); +} + TEST_F(TestReadDecimals, Decimal128ByteArray) { const std::vector> big_endian_decimals = { // 123456 @@ -3044,6 +3082,36 @@ TEST(ArrowReadWrite, NestedRequiredField) { /*row_group_size=*/8); } +TEST(ArrowReadWrite, Decimal32) { + using ::arrow::field; + + auto type = ::arrow::decimal32(8, 4); + + const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678", + "-9999.9999", "9999.9999"])"; + auto array = ::arrow::ArrayFromJSON(type, json); + auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array}); + auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build(); + ArrowReaderProperties read_properties = default_arrow_reader_properties(); + read_properties.set_smallest_decimal_enabled(true); + CheckSimpleRoundtrip(table, 2, props_store_schema, read_properties); +} + +TEST(ArrowReadWrite, Decimal64) { + using ::arrow::field; + + auto type = ::arrow::decimal64(18, 4); + + const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678", + "-9999.9999", "9999.9999"])"; + auto array = ::arrow::ArrayFromJSON(type, json); + auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array}); + auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build(); + ArrowReaderProperties read_properties = default_arrow_reader_properties(); + read_properties.set_smallest_decimal_enabled(true); + CheckSimpleRoundtrip(table, 2, props_store_schema, read_properties); +} + TEST(ArrowReadWrite, Decimal256) { using ::arrow::Decimal256; using ::arrow::field; @@ -5468,6 +5536,86 @@ TYPED_TEST(TestIntegerAnnotateDecimalTypeParquetIO, SingleNullableDecimalColumn) ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values)); } +template +class TestIntegerAnnotateSmallestDecimalTypeParquetIO + : public TestIntegerAnnotateDecimalTypeParquetIO { + public: + void ReadAndCheckSingleDecimalColumnFile(const Array& values) { + ArrowReaderProperties properties = default_arrow_reader_properties(); + properties.set_smallest_decimal_enabled(true); + + std::shared_ptr out; + std::unique_ptr reader; + this->ReaderFromSink(&reader, properties); + this->ReadSingleColumnFile(std::move(reader), &out); + + if (values.type()->id() == out->type()->id()) { + AssertArraysEqual(values, *out); + } else { + auto& expected_values = values; + auto& read_values = *out; + ASSERT_EQ(expected_values.length(), read_values.length()); + ASSERT_EQ(expected_values.null_count(), read_values.null_count()); + + auto format_decimal = [](const Array& values, int64_t index) { + switch (values.type()->id()) { + case ::arrow::Type::DECIMAL32: + return static_cast(values).FormatValue(index); + case ::arrow::Type::DECIMAL64: + return static_cast(values).FormatValue(index); + case ::arrow::Type::DECIMAL128: + return static_cast(values).FormatValue( + index); + case ::arrow::Type::DECIMAL256: + return static_cast(values).FormatValue( + index); + default: + std::string err("Unexpected decimal type: " + values.type()->ToString()); + ADD_FAILURE() << err; + return err; + } + }; + + for (int64_t i = 0; i < expected_values.length(); ++i) { + ASSERT_EQ(expected_values.IsNull(i), read_values.IsNull(i)); + if (!expected_values.IsNull(i)) { + std::string expected_str = format_decimal(expected_values, i); + std::string read_str = format_decimal(read_values, i); + ASSERT_EQ(expected_str, read_str); + } + } + } + } +}; + +using SmallestDecimalTestTypes = ::testing::Types< + Decimal32WithPrecisionAndScale<1>, Decimal32WithPrecisionAndScale<5>, + Decimal32WithPrecisionAndScale<9>, Decimal64WithPrecisionAndScale<1>, + Decimal64WithPrecisionAndScale<5>, Decimal64WithPrecisionAndScale<10>, + Decimal64WithPrecisionAndScale<18>, Decimal128WithPrecisionAndScale<1>, + Decimal128WithPrecisionAndScale<5>, Decimal128WithPrecisionAndScale<10>, + Decimal128WithPrecisionAndScale<18>, Decimal256WithPrecisionAndScale<1>, + Decimal256WithPrecisionAndScale<5>, Decimal256WithPrecisionAndScale<10>, + Decimal256WithPrecisionAndScale<18>>; + +TYPED_TEST_SUITE(TestIntegerAnnotateSmallestDecimalTypeParquetIO, + SmallestDecimalTestTypes); + +TYPED_TEST(TestIntegerAnnotateSmallestDecimalTypeParquetIO, + SingleNonNullableDecimalColumn) { + std::shared_ptr values; + ASSERT_OK(NonNullArray(SMALL_SIZE, &values)); + ASSERT_NO_FATAL_FAILURE(this->WriteColumn(values)); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values)); +} + +TYPED_TEST(TestIntegerAnnotateSmallestDecimalTypeParquetIO, SingleNullableDecimalColumn) { + std::shared_ptr values; + ASSERT_OK(NullableArray(SMALL_SIZE, SMALL_SIZE / 2, kDefaultSeed, &values)); + ASSERT_NO_FATAL_FAILURE(this->WriteColumn(values)); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values)); +} + template class TestBufferedParquetIO : public TestParquetIO { public: diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 6bf1e4f1e4d..7b599224239 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -69,6 +69,12 @@ using arrow::Decimal128Type; using arrow::Decimal256; using arrow::Decimal256Array; using arrow::Decimal256Type; +using arrow::Decimal32; +using arrow::Decimal32Array; +using arrow::Decimal32Type; +using arrow::Decimal64; +using arrow::Decimal64Array; +using arrow::Decimal64Type; using arrow::Field; using arrow::Int32Array; using arrow::ListArray; @@ -600,17 +606,10 @@ Status RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width, return ::arrow::Status::OK(); } -template -struct DecimalTypeTrait; - -template <> -struct DecimalTypeTrait<::arrow::Decimal128Array> { - using value = ::arrow::Decimal128; -}; - -template <> -struct DecimalTypeTrait<::arrow::Decimal256Array> { - using value = ::arrow::Decimal256; +template > +struct DecimalTypeTrait { + using value = typename ::arrow::TypeTraits::CType; }; template @@ -725,16 +724,17 @@ struct DecimalConverter { /// The parquet spec allows systems to write decimals in int32, int64 if the values are /// small enough to fit in less 4 bytes or less than 8 bytes, respectively. /// This function implements the conversion from int32 and int64 arrays to decimal arrays. -template < - typename DecimalArrayType, typename ParquetIntegerType, - typename = ::arrow::enable_if_t::value || - std::is_same::value>> +template ::value, + typename = ::arrow::enable_if_t<::arrow::internal::IsOneOf< + ParquetIntegerType, Int32Type, Int64Type>::value>> static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, const std::shared_ptr& field, Datum* out) { - // Decimal128 and Decimal256 are only Arrow constructs. Parquet does not - // specifically distinguish between decimal byte widths. - DCHECK(field->type()->id() == ::arrow::Type::DECIMAL128 || - field->type()->id() == ::arrow::Type::DECIMAL256); + using ArrayTypeClass = typename DecimalArrayType::TypeClass; + + // Decimal32, Decimal64, Decimal128 and Decimal256 are only Arrow constructs. Parquet + // does not specifically distinguish between decimal byte widths. + DCHECK(field->type()->id() == ArrayTypeClass::type_id); const int64_t length = reader->values_written(); @@ -745,25 +745,17 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, const auto values = reinterpret_cast(reader->values()); - const auto& decimal_type = checked_cast(*field->type()); + const auto& decimal_type = checked_cast(*field->type()); const int64_t type_length = decimal_type.byte_width(); ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool)); uint8_t* out_ptr = data->mutable_data(); - using ::arrow::bit_util::FromLittleEndian; - for (int64_t i = 0; i < length; ++i, out_ptr += type_length) { // sign/zero extend int32_t values, otherwise a no-op const auto value = static_cast(values[i]); - - if constexpr (std::is_same_v) { - ::arrow::Decimal128 decimal(value); - decimal.ToBytes(out_ptr); - } else { - ::arrow::Decimal256 decimal(value); - decimal.ToBytes(out_ptr); - } + DecimalValue decimal(value); + decimal.ToBytes(out_ptr); } if (reader->nullable_values() && field->nullable()) { @@ -802,6 +794,33 @@ Status TransferDecimal(RecordReader* reader, MemoryPool* pool, return Status::OK(); } +template +Status TransferDecimalTo(Type::type physical_type, Args... args) { + switch (physical_type) { + case ::parquet::Type::INT32: { + auto fn = DecimalIntegerTransfer; + RETURN_NOT_OK(fn(args...)); + } break; + case ::parquet::Type::INT64: { + auto fn = DecimalIntegerTransfer; + RETURN_NOT_OK(fn(args...)); + } break; + case ::parquet::Type::BYTE_ARRAY: { + auto fn = TransferDecimal; + RETURN_NOT_OK(fn(args...)); + } break; + case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { + auto fn = TransferDecimal; + RETURN_NOT_OK(fn(args...)); + } break; + default: + return Status::Invalid( + "Physical type for decimal must be int32, int64, byte array, or fixed length " + "binary"); + } + return Status::OK(); +} + Status TransferHalfFloat(RecordReader* reader, MemoryPool* pool, const std::shared_ptr& field, Datum* out) { static const auto binary_type = ::arrow::fixed_size_binary(2); @@ -902,55 +921,22 @@ Status TransferColumnData(RecordReader* reader, } RETURN_NOT_OK(TransferHalfFloat(reader, pool, value_field, &result)); } break; + case ::arrow::Type::DECIMAL32: { + RETURN_NOT_OK(TransferDecimalTo(descr->physical_type(), reader, + pool, value_field, &result)); + } break; + case ::arrow::Type::DECIMAL64: { + RETURN_NOT_OK(TransferDecimalTo(descr->physical_type(), reader, + pool, value_field, &result)); + } break; case ::arrow::Type::DECIMAL128: { - switch (descr->physical_type()) { - case ::parquet::Type::INT32: { - auto fn = DecimalIntegerTransfer; - RETURN_NOT_OK(fn(reader, pool, value_field, &result)); - } break; - case ::parquet::Type::INT64: { - auto fn = &DecimalIntegerTransfer; - RETURN_NOT_OK(fn(reader, pool, value_field, &result)); - } break; - case ::parquet::Type::BYTE_ARRAY: { - auto fn = &TransferDecimal; - RETURN_NOT_OK(fn(reader, pool, value_field, &result)); - } break; - case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { - auto fn = &TransferDecimal; - RETURN_NOT_OK(fn(reader, pool, value_field, &result)); - } break; - default: - return Status::Invalid( - "Physical type for decimal128 must be int32, int64, byte array, or fixed " - "length binary"); - } + RETURN_NOT_OK(TransferDecimalTo(descr->physical_type(), reader, + pool, value_field, &result)); + } break; + case ::arrow::Type::DECIMAL256: { + RETURN_NOT_OK(TransferDecimalTo(descr->physical_type(), reader, + pool, value_field, &result)); } break; - case ::arrow::Type::DECIMAL256: - switch (descr->physical_type()) { - case ::parquet::Type::INT32: { - auto fn = DecimalIntegerTransfer; - RETURN_NOT_OK(fn(reader, pool, value_field, &result)); - } break; - case ::parquet::Type::INT64: { - auto fn = &DecimalIntegerTransfer; - RETURN_NOT_OK(fn(reader, pool, value_field, &result)); - } break; - case ::parquet::Type::BYTE_ARRAY: { - auto fn = &TransferDecimal; - RETURN_NOT_OK(fn(reader, pool, value_field, &result)); - } break; - case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { - auto fn = &TransferDecimal; - RETURN_NOT_OK(fn(reader, pool, value_field, &result)); - } break; - default: - return Status::Invalid( - "Physical type for decimal256 must be int32, int64, byte array, or fixed " - "length binary"); - } - break; - case ::arrow::Type::TIMESTAMP: { const ::arrow::TimestampType& timestamp_type = checked_cast<::arrow::TimestampType&>(*value_field->type()); diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 2beb2c66efc..44418ac47d8 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -392,13 +392,15 @@ Status FieldToNode(const std::string& name, const std::shared_ptr& field, static_cast(*field->type()); length = fixed_size_binary_type.byte_width(); } break; + case ArrowTypeId::DECIMAL32: + case ArrowTypeId::DECIMAL64: case ArrowTypeId::DECIMAL128: case ArrowTypeId::DECIMAL256: { const auto& decimal_type = static_cast(*field->type()); precision = decimal_type.precision(); scale = decimal_type.scale(); if (properties.store_decimal_as_integer() && 1 <= precision && precision <= 18) { - type = precision <= 9 ? ParquetType ::INT32 : ParquetType ::INT64; + type = precision <= 9 ? ParquetType::INT32 : ParquetType::INT64; } else { type = ParquetType::FIXED_LEN_BYTE_ARRAY; length = DecimalType::DecimalSize(precision); @@ -1076,10 +1078,29 @@ Result ApplyOriginalStorageMetadata(const Field& origin_field, modified = true; } - if (origin_type->id() == ::arrow::Type::DECIMAL256 && - inferred_type->id() == ::arrow::Type::DECIMAL128) { - inferred->field = inferred->field->WithType(origin_type); - modified = true; + switch (origin_type->id()) { + case ::arrow::Type::DECIMAL256: + if (inferred_type->id() == ::arrow::Type::DECIMAL128) { + inferred->field = inferred->field->WithType(origin_type); + modified = true; + break; + } + [[fallthrough]]; + case ::arrow::Type::DECIMAL128: + if (inferred_type->id() == ::arrow::Type::DECIMAL64) { + inferred->field = inferred->field->WithType(origin_type); + modified = true; + break; + } + [[fallthrough]]; + case ::arrow::Type::DECIMAL64: + if (inferred_type->id() == ::arrow::Type::DECIMAL32) { + inferred->field = inferred->field->WithType(origin_type); + modified = true; + } + break; + default: + break; } // Restore field metadata diff --git a/cpp/src/parquet/arrow/schema_internal.cc b/cpp/src/parquet/arrow/schema_internal.cc index 72b8f0d992b..2e8cf764b27 100644 --- a/cpp/src/parquet/arrow/schema_internal.cc +++ b/cpp/src/parquet/arrow/schema_internal.cc @@ -39,8 +39,12 @@ using ::arrow::internal::checked_cast; namespace { -Result> MakeArrowDecimal(const LogicalType& logical_type) { +Result> MakeArrowDecimal(const LogicalType& logical_type, + bool smallest_decimal_enabled) { const auto& decimal = checked_cast(logical_type); + if (smallest_decimal_enabled) { + return ::arrow::smallest_decimal(decimal.precision(), decimal.scale()); + } if (decimal.precision() <= ::arrow::Decimal128Type::kMaxPrecision) { return ::arrow::Decimal128Type::Make(decimal.precision(), decimal.scale()); } @@ -154,7 +158,7 @@ Result> FromByteArray( case LogicalType::Type::STRING: return utf8_type(); case LogicalType::Type::DECIMAL: - return MakeArrowDecimal(logical_type); + return MakeArrowDecimal(logical_type, reader_properties.smallest_decimal_enabled()); case LogicalType::Type::NONE: case LogicalType::Type::ENUM: case LogicalType::Type::BSON: @@ -192,7 +196,7 @@ Result> FromFLBA( const ArrowReaderProperties& reader_properties) { switch (logical_type.type()) { case LogicalType::Type::DECIMAL: - return MakeArrowDecimal(logical_type); + return MakeArrowDecimal(logical_type, reader_properties.smallest_decimal_enabled()); case LogicalType::Type::FLOAT16: return ::arrow::float16(); case LogicalType::Type::NONE: @@ -212,7 +216,8 @@ Result> FromFLBA( } // namespace -::arrow::Result> FromInt32(const LogicalType& logical_type) { +::arrow::Result> FromInt32( + const LogicalType& logical_type, const ArrowReaderProperties& reader_properties) { switch (logical_type.type()) { case LogicalType::Type::INT: return MakeArrowInt(logical_type); @@ -221,7 +226,7 @@ ::arrow::Result> FromInt32(const LogicalType& logical case LogicalType::Type::TIME: return MakeArrowTime32(logical_type); case LogicalType::Type::DECIMAL: - return MakeArrowDecimal(logical_type); + return MakeArrowDecimal(logical_type, reader_properties.smallest_decimal_enabled()); case LogicalType::Type::NONE: return ::arrow::int32(); default: @@ -230,12 +235,13 @@ ::arrow::Result> FromInt32(const LogicalType& logical } } -Result> FromInt64(const LogicalType& logical_type) { +Result> FromInt64( + const LogicalType& logical_type, const ArrowReaderProperties& reader_properties) { switch (logical_type.type()) { case LogicalType::Type::INT: return MakeArrowInt64(logical_type); case LogicalType::Type::DECIMAL: - return MakeArrowDecimal(logical_type); + return MakeArrowDecimal(logical_type, reader_properties.smallest_decimal_enabled()); case LogicalType::Type::TIMESTAMP: return MakeArrowTimestamp(logical_type); case LogicalType::Type::TIME: @@ -265,9 +271,9 @@ Result> GetArrowType( case ParquetType::BOOLEAN: return ::arrow::boolean(); case ParquetType::INT32: - return FromInt32(logical_type); + return FromInt32(logical_type, reader_properties); case ParquetType::INT64: - return FromInt64(logical_type); + return FromInt64(logical_type, reader_properties); case ParquetType::INT96: return ::arrow::timestamp(reader_properties.coerce_int96_timestamp_unit()); case ParquetType::FLOAT: diff --git a/cpp/src/parquet/arrow/schema_internal.h b/cpp/src/parquet/arrow/schema_internal.h index 087b16755b0..81bf56b7a44 100644 --- a/cpp/src/parquet/arrow/schema_internal.h +++ b/cpp/src/parquet/arrow/schema_internal.h @@ -19,16 +19,19 @@ #include "arrow/result.h" #include "arrow/type_fwd.h" +#include "parquet/properties.h" #include "parquet/schema.h" namespace parquet::arrow { using ::arrow::Result; -Result> FromFLBA(const LogicalType& logical_type, - int32_t physical_length); -Result> FromInt32(const LogicalType& logical_type); -Result> FromInt64(const LogicalType& logical_type); +Result> FromInt32( + const LogicalType& logical_type, + const ArrowReaderProperties& reader_properties = default_arrow_reader_properties()); +Result> FromInt64( + const LogicalType& logical_type, + const ArrowReaderProperties& reader_properties = default_arrow_reader_properties()); Result> GetArrowType( Type::type physical_type, const LogicalType& logical_type, int type_length, diff --git a/cpp/src/parquet/arrow/test_util.h b/cpp/src/parquet/arrow/test_util.h index cfc57ce6ea7..05f6fd24ac0 100644 --- a/cpp/src/parquet/arrow/test_util.h +++ b/cpp/src/parquet/arrow/test_util.h @@ -47,25 +47,27 @@ using ::arrow::Array; using ::arrow::ChunkedArray; using ::arrow::Status; -template -struct Decimal128WithPrecisionAndScale { - static_assert(PRECISION >= 1 && PRECISION <= 38, "Invalid precision value"); - - using type = ::arrow::Decimal128Type; - static constexpr ::arrow::Type::type type_id = ::arrow::Decimal128Type::type_id; +template > +struct DecimalWithPrecisionAndScale { + using type = T; + static_assert(PRECISION >= T::kMinPrecision && PRECISION <= T::kMaxPrecision, + "Invalid precision value"); + static constexpr ::arrow::Type::type type_id = T::type_id; static constexpr int32_t precision = PRECISION; static constexpr int32_t scale = PRECISION - 1; }; - template -struct Decimal256WithPrecisionAndScale { - static_assert(PRECISION >= 1 && PRECISION <= 76, "Invalid precision value"); - - using type = ::arrow::Decimal256Type; - static constexpr ::arrow::Type::type type_id = ::arrow::Decimal256Type::type_id; - static constexpr int32_t precision = PRECISION; - static constexpr int32_t scale = PRECISION - 1; -}; +using Decimal32WithPrecisionAndScale = + DecimalWithPrecisionAndScale<::arrow::Decimal32Type, PRECISION>; +template +using Decimal64WithPrecisionAndScale = + DecimalWithPrecisionAndScale<::arrow::Decimal64Type, PRECISION>; +template +using Decimal128WithPrecisionAndScale = + DecimalWithPrecisionAndScale<::arrow::Decimal128Type, PRECISION>; +template +using Decimal256WithPrecisionAndScale = + DecimalWithPrecisionAndScale<::arrow::Decimal256Type, PRECISION>; template ::arrow::enable_if_floating_point NonNullArray( @@ -155,45 +157,25 @@ static void random_decimals(int64_t n, uint32_t seed, int32_t precision, uint8_t } template -::arrow::enable_if_t< - std::is_same>::value, Status> +::arrow::enable_if_t>, + Status> NonNullArray(size_t size, std::shared_ptr* out) { constexpr int32_t kDecimalPrecision = precision; - constexpr int32_t kDecimalScale = Decimal128WithPrecisionAndScale::scale; + constexpr int32_t kDecimalScale = ArrowType::scale; - const auto type = ::arrow::decimal128(kDecimalPrecision, kDecimalScale); - ::arrow::Decimal128Builder builder(type); - const int32_t byte_width = - static_cast(*type).byte_width(); + const auto type = + std::make_shared(kDecimalPrecision, kDecimalScale); + const int32_t byte_width = type->byte_width(); constexpr int32_t seed = 0; ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); - random_decimals<::arrow::Decimal128Type::kByteWidth>(size, seed, kDecimalPrecision, - out_buf->mutable_data()); - - RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size)); - return builder.Finish(out); -} - -template -::arrow::enable_if_t< - std::is_same>::value, Status> -NonNullArray(size_t size, std::shared_ptr* out) { - constexpr int32_t kDecimalPrecision = precision; - constexpr int32_t kDecimalScale = Decimal256WithPrecisionAndScale::scale; - - const auto type = ::arrow::decimal256(kDecimalPrecision, kDecimalScale); - ::arrow::Decimal256Builder builder(type); - const int32_t byte_width = - static_cast(*type).byte_width(); - - constexpr int32_t seed = 0; - - ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); - random_decimals<::arrow::Decimal256Type::kByteWidth>(size, seed, kDecimalPrecision, - out_buf->mutable_data()); + random_decimals(size, seed, kDecimalPrecision, + out_buf->mutable_data()); + using Builder = typename ::arrow::TypeTraits::BuilderType; + Builder builder(type); RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size)); return builder.Finish(out); } @@ -344,8 +326,9 @@ ::arrow::enable_if_fixed_size_binary NullableArray( } template -::arrow::enable_if_t< - std::is_same>::value, Status> +::arrow::enable_if_t>, + Status> NullableArray(size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<::arrow::Array>* out) { std::vector valid_bytes(size, '\1'); @@ -355,44 +338,18 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, } constexpr int32_t kDecimalPrecision = precision; - constexpr int32_t kDecimalScale = Decimal128WithPrecisionAndScale::scale; - const auto type = ::arrow::decimal128(kDecimalPrecision, kDecimalScale); - const int32_t byte_width = - static_cast(*type).byte_width(); - - ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); - - random_decimals<::arrow::Decimal128Type::kByteWidth>(size, seed, precision, - out_buf->mutable_data()); - - ::arrow::Decimal128Builder builder(type); - RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data())); - return builder.Finish(out); -} - -template -::arrow::enable_if_t< - std::is_same>::value, Status> -NullableArray(size_t size, size_t num_nulls, uint32_t seed, - std::shared_ptr<::arrow::Array>* out) { - std::vector valid_bytes(size, '\1'); - - for (size_t i = 0; i < num_nulls; ++i) { - valid_bytes[i * 2] = '\0'; - } + constexpr int32_t kDecimalScale = ArrowType::scale; - constexpr int32_t kDecimalPrecision = precision; - constexpr int32_t kDecimalScale = Decimal256WithPrecisionAndScale::scale; - const auto type = ::arrow::decimal256(kDecimalPrecision, kDecimalScale); - const int32_t byte_width = - static_cast(*type).byte_width(); + const auto type = + std::make_shared(kDecimalPrecision, kDecimalScale); + const int32_t byte_width = type->byte_width(); ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); + random_decimals(size, seed, precision, + out_buf->mutable_data()); - random_decimals<::arrow::Decimal256Type::kByteWidth>(size, seed, precision, - out_buf->mutable_data()); - - ::arrow::Decimal256Builder builder(type); + using Builder = typename ::arrow::TypeTraits::BuilderType; + Builder builder(type); RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data())); return builder.Finish(out); } diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 288a656d20e..2cb078aeee8 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -2115,33 +2115,28 @@ struct SerializeFunctor< ArrowWriteContext* ctx, value_type* out) { if (array.null_count() == 0) { for (int64_t i = 0; i < array.length(); i++) { - out[i] = TransferValue(array.Value(i)); + out[i] = TransferValue(array.Value(i)); } } else { for (int64_t i = 0; i < array.length(); i++) { - out[i] = - array.IsValid(i) ? TransferValue(array.Value(i)) : 0; + out[i] = array.IsValid(i) ? TransferValue(array.Value(i)) : 0; } } return Status::OK(); } - template + private: value_type TransferValue(const uint8_t* in) const { - static_assert(byte_width == 16 || byte_width == 32, - "only 16 and 32 byte Decimals supported"); - value_type value = 0; - if constexpr (byte_width == 16) { - ::arrow::Decimal128 decimal_value(in); - PARQUET_THROW_NOT_OK(decimal_value.ToInteger(&value)); + using DecimalValue = typename ::arrow::TypeTraits::CType; + DecimalValue decimal_value(in); + if constexpr (std::is_same_v) { + return static_cast(decimal_value.low_bits()); } else { - ::arrow::Decimal256 decimal_value(in); - // Decimal256 does not provide ToInteger, but we are sure it fits in the target - // integer type. - value = static_cast(decimal_value.low_bits()); + value_type value = 0; + PARQUET_THROW_NOT_OK(decimal_value.ToInteger(&value)); + return value; } - return value; } }; @@ -2179,6 +2174,8 @@ Status TypedColumnWriterImpl::WriteArrowDense( WRITE_ZERO_COPY_CASE(DATE32) WRITE_SERIALIZE_CASE(DATE64) WRITE_SERIALIZE_CASE(TIME32) + WRITE_SERIALIZE_CASE(DECIMAL32) + WRITE_SERIALIZE_CASE(DECIMAL64) WRITE_SERIALIZE_CASE(DECIMAL128) WRITE_SERIALIZE_CASE(DECIMAL256) default: @@ -2350,6 +2347,8 @@ Status TypedColumnWriterImpl::WriteArrowDense( WRITE_SERIALIZE_CASE(UINT64) WRITE_ZERO_COPY_CASE(TIME64) WRITE_ZERO_COPY_CASE(DURATION) + WRITE_SERIALIZE_CASE(DECIMAL32) + WRITE_SERIALIZE_CASE(DECIMAL64) WRITE_SERIALIZE_CASE(DECIMAL128) WRITE_SERIALIZE_CASE(DECIMAL256) default: @@ -2491,12 +2490,11 @@ struct SerializeFunctor< if (array.null_count() == 0) { for (int64_t i = 0; i < array.length(); i++) { - out[i] = FixDecimalEndianness(array.GetValue(i), offset); + out[i] = FixDecimalEndianness(array.GetValue(i), offset); } } else { for (int64_t i = 0; i < array.length(); i++) { - out[i] = array.IsValid(i) ? FixDecimalEndianness( - array.GetValue(i), offset) + out[i] = array.IsValid(i) ? FixDecimalEndianness(array.GetValue(i), offset) : FixedLenByteArray(); } } @@ -2504,8 +2502,9 @@ struct SerializeFunctor< return Status::OK(); } + private: // Parquet's Decimal are stored with FixedLength values where the length is - // proportional to the precision. Arrow's Decimal are always stored with 16/32 + // proportional to the precision. Arrow's Decimal are always stored with 4/8/16/32 // bytes. Thus the internal FLBA pointer must be adjusted by the offset calculated // here. int32_t Offset(const Array& array) { @@ -2519,29 +2518,29 @@ struct SerializeFunctor< int64_t non_null_count = array.length() - array.null_count(); int64_t size = non_null_count * ArrowType::kByteWidth; scratch_buffer = AllocateBuffer(ctx->memory_pool, size); - scratch = reinterpret_cast(scratch_buffer->mutable_data()); + scratch = scratch_buffer->mutable_data(); } - template FixedLenByteArray FixDecimalEndianness(const uint8_t* in, int64_t offset) { - const auto* u64_in = reinterpret_cast(in); auto out = reinterpret_cast(scratch) + offset; - static_assert(byte_width == 16 || byte_width == 32, - "only 16 and 32 byte Decimals supported"); - if (byte_width == 32) { - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[3]); - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[2]); - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[1]); - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); + if constexpr (std::is_same_v) { + const auto* u32_in = reinterpret_cast(in); + auto p = reinterpret_cast(scratch); + *p++ = ::arrow::bit_util::ToBigEndian(u32_in[0]); + scratch = reinterpret_cast(p); } else { - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[1]); - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); + const auto* u64_in = reinterpret_cast(in); + auto p = reinterpret_cast(scratch); + for (int i = ArrowType::kByteWidth / 8 - 1; i >= 0; i--) { + *p++ = ::arrow::bit_util::ToBigEndian(u64_in[i]); + } + scratch = reinterpret_cast(p); } return FixedLenByteArray(out); } std::shared_ptr scratch_buffer; - int64_t* scratch; + uint8_t* scratch; }; // ---------------------------------------------------------------------- @@ -2577,6 +2576,8 @@ Status TypedColumnWriterImpl::WriteArrowDense( const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { switch (array.type()->id()) { WRITE_SERIALIZE_CASE(FIXED_SIZE_BINARY) + WRITE_SERIALIZE_CASE(DECIMAL32) + WRITE_SERIALIZE_CASE(DECIMAL64) WRITE_SERIALIZE_CASE(DECIMAL128) WRITE_SERIALIZE_CASE(DECIMAL256) WRITE_SERIALIZE_CASE(HALF_FLOAT) diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 27bf672f86e..b6a659a9a09 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -1006,7 +1006,8 @@ class PARQUET_EXPORT ArrowReaderProperties { binary_type_(kArrowDefaultBinaryType), list_type_(kArrowDefaultListType), arrow_extensions_enabled_(false), - should_load_statistics_(false) {} + should_load_statistics_(false), + smallest_decimal_enabled_(false) {} /// \brief Set whether to use the IO thread pool to parse columns in parallel. /// @@ -1126,6 +1127,15 @@ class PARQUET_EXPORT ArrowReaderProperties { /// Return whether loading statistics as much as possible. bool should_load_statistics() const { return should_load_statistics_; } + /// \brief Set whether infer Decimal32/64 from parquet. + /// + /// Default is false. + void set_smallest_decimal_enabled(bool smallest_decimal_enable) { + smallest_decimal_enabled_ = smallest_decimal_enable; + } + /// Return whether infer Decimal32/64 from parquet. + bool smallest_decimal_enabled() const { return smallest_decimal_enabled_; } + private: bool use_threads_; std::unordered_set read_dict_indices_; @@ -1138,6 +1148,7 @@ class PARQUET_EXPORT ArrowReaderProperties { ::arrow::Type::type list_type_; bool arrow_extensions_enabled_; bool should_load_statistics_; + bool smallest_decimal_enabled_; }; /// EXPERIMENTAL: Constructs the default ArrowReaderProperties From 198f56a5f0a51307c38a90ce46a41fb67e0a31c3 Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Tue, 26 Aug 2025 19:24:08 +0800 Subject: [PATCH 02/16] fix code format --- cpp/src/parquet/properties.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index b6a659a9a09..c8f3b1a1316 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -1128,7 +1128,7 @@ class PARQUET_EXPORT ArrowReaderProperties { bool should_load_statistics() const { return should_load_statistics_; } /// \brief Set whether infer Decimal32/64 from parquet. - /// + /// /// Default is false. void set_smallest_decimal_enabled(bool smallest_decimal_enable) { smallest_decimal_enabled_ = smallest_decimal_enable; From 00c21e757d56065a43993ddd3be493f247e8b1ef Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Tue, 26 Aug 2025 19:30:06 +0800 Subject: [PATCH 03/16] add deleted comments --- cpp/src/parquet/column_writer.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 2cb078aeee8..618fa174dea 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -2131,6 +2131,8 @@ struct SerializeFunctor< using DecimalValue = typename ::arrow::TypeTraits::CType; DecimalValue decimal_value(in); if constexpr (std::is_same_v) { + // Decimal256 does not provide ToInteger, but we are sure it fits in the target + // integer type. return static_cast(decimal_value.low_bits()); } else { value_type value = 0; From ceb0b1277503353d977fcd34fbcfa324e2f78b4b Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Tue, 26 Aug 2025 19:47:23 +0800 Subject: [PATCH 04/16] refactoring some code --- cpp/src/parquet/arrow/reader_internal.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 7b599224239..162b7691b45 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -725,12 +725,12 @@ struct DecimalConverter { /// small enough to fit in less 4 bytes or less than 8 bytes, respectively. /// This function implements the conversion from int32 and int64 arrays to decimal arrays. template ::value, typename = ::arrow::enable_if_t<::arrow::internal::IsOneOf< ParquetIntegerType, Int32Type, Int64Type>::value>> static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, const std::shared_ptr& field, Datum* out) { using ArrayTypeClass = typename DecimalArrayType::TypeClass; + using DecimalValue = typename DecimalTypeTrait::value; // Decimal32, Decimal64, Decimal128 and Decimal256 are only Arrow constructs. Parquet // does not specifically distinguish between decimal byte widths. From bbd9740dec82ec107488a89583d2309fb67c88eb Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Tue, 26 Aug 2025 20:31:39 +0800 Subject: [PATCH 05/16] correctly restore type from metadata --- .../parquet/arrow/arrow_reader_writer_test.cc | 8 ++---- cpp/src/parquet/arrow/schema.cc | 27 +++---------------- 2 files changed, 6 insertions(+), 29 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index b27a13df90e..f69d29dd966 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -3092,9 +3092,7 @@ TEST(ArrowReadWrite, Decimal32) { auto array = ::arrow::ArrayFromJSON(type, json); auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array}); auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build(); - ArrowReaderProperties read_properties = default_arrow_reader_properties(); - read_properties.set_smallest_decimal_enabled(true); - CheckSimpleRoundtrip(table, 2, props_store_schema, read_properties); + CheckSimpleRoundtrip(table, 2, props_store_schema); } TEST(ArrowReadWrite, Decimal64) { @@ -3107,9 +3105,7 @@ TEST(ArrowReadWrite, Decimal64) { auto array = ::arrow::ArrayFromJSON(type, json); auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array}); auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build(); - ArrowReaderProperties read_properties = default_arrow_reader_properties(); - read_properties.set_smallest_decimal_enabled(true); - CheckSimpleRoundtrip(table, 2, props_store_schema, read_properties); + CheckSimpleRoundtrip(table, 2, props_store_schema); } TEST(ArrowReadWrite, Decimal256) { diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 44418ac47d8..23025f610bf 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -1078,29 +1078,10 @@ Result ApplyOriginalStorageMetadata(const Field& origin_field, modified = true; } - switch (origin_type->id()) { - case ::arrow::Type::DECIMAL256: - if (inferred_type->id() == ::arrow::Type::DECIMAL128) { - inferred->field = inferred->field->WithType(origin_type); - modified = true; - break; - } - [[fallthrough]]; - case ::arrow::Type::DECIMAL128: - if (inferred_type->id() == ::arrow::Type::DECIMAL64) { - inferred->field = inferred->field->WithType(origin_type); - modified = true; - break; - } - [[fallthrough]]; - case ::arrow::Type::DECIMAL64: - if (inferred_type->id() == ::arrow::Type::DECIMAL32) { - inferred->field = inferred->field->WithType(origin_type); - modified = true; - } - break; - default: - break; + if (::arrow::is_decimal(origin_type->id()) && + ::arrow::is_decimal(inferred_type->id())) { + inferred->field = inferred->field->WithType(origin_type); + modified = true; } // Restore field metadata From 6df842a9998526ead019d4007254c60d776bcde3 Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Tue, 26 Aug 2025 20:35:38 +0800 Subject: [PATCH 06/16] improve test --- cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index f69d29dd966..5629ac36512 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -3098,7 +3098,7 @@ TEST(ArrowReadWrite, Decimal32) { TEST(ArrowReadWrite, Decimal64) { using ::arrow::field; - auto type = ::arrow::decimal64(18, 4); + auto type = ::arrow::decimal64(8, 4); const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678", "-9999.9999", "9999.9999"])"; From 1baa5cc9eeade433c67f87f2fe46bf74185f7fc1 Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Wed, 27 Aug 2025 11:09:58 +0800 Subject: [PATCH 07/16] fix some review comments --- cpp/src/parquet/arrow/schema.cc | 10 ++++++++-- cpp/src/parquet/column_writer.cc | 12 ++++++++++-- cpp/src/parquet/properties.h | 6 +++++- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 23025f610bf..5bb54f15724 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -1080,8 +1080,14 @@ Result ApplyOriginalStorageMetadata(const Field& origin_field, if (::arrow::is_decimal(origin_type->id()) && ::arrow::is_decimal(inferred_type->id())) { - inferred->field = inferred->field->WithType(origin_type); - modified = true; + auto& origin_decimal = checked_cast(*origin_type); + auto& inferred_decimal = checked_cast(*inferred_type); + if (!(origin_decimal.id() == inferred_decimal.id() && + origin_decimal.scale() == inferred_decimal.scale() && + origin_decimal.precision() <= inferred_decimal.precision())) { + inferred->field = inferred->field->WithType(origin_type); + modified = true; + } } // Restore field metadata diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 618fa174dea..f35f84f002b 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -2533,8 +2533,16 @@ struct SerializeFunctor< } else { const auto* u64_in = reinterpret_cast(in); auto p = reinterpret_cast(scratch); - for (int i = ArrowType::kByteWidth / 8 - 1; i >= 0; i--) { - *p++ = ::arrow::bit_util::ToBigEndian(u64_in[i]); + if constexpr (std::is_same_v) { + *p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); + } else if constexpr (std::is_same_v) { + *p++ = ::arrow::bit_util::ToBigEndian(u64_in[1]); + *p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); + } else if constexpr (std::is_same_v) { + *p++ = ::arrow::bit_util::ToBigEndian(u64_in[3]); + *p++ = ::arrow::bit_util::ToBigEndian(u64_in[2]); + *p++ = ::arrow::bit_util::ToBigEndian(u64_in[1]); + *p++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); } scratch = reinterpret_cast(p); } diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index c8f3b1a1316..cbd9ba0b7ed 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -1133,7 +1133,11 @@ class PARQUET_EXPORT ArrowReaderProperties { void set_smallest_decimal_enabled(bool smallest_decimal_enable) { smallest_decimal_enabled_ = smallest_decimal_enable; } - /// Return whether infer Decimal32/64 from parquet. + /// \brief Return whether to infer Decimal32/64 from parquet. + /// + /// If `store_schema` is set, this flag does nothing and we always use decimal type in + /// metadata. If `store_schema` is not set, when this flag is true, we infer decimal + /// with small precision to Decimal32/Decimal64 instead of Decimal128. bool smallest_decimal_enabled() const { return smallest_decimal_enabled_; } private: From 48d12452669014fced36d9b750b1a006095f2f3d Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Thu, 28 Aug 2025 11:17:40 +0800 Subject: [PATCH 08/16] adjust comments and simplify metadata behavior --- cpp/src/parquet/arrow/schema.cc | 6 +++--- cpp/src/parquet/properties.h | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 5bb54f15724..c091d4d746d 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -1082,9 +1082,9 @@ Result ApplyOriginalStorageMetadata(const Field& origin_field, ::arrow::is_decimal(inferred_type->id())) { auto& origin_decimal = checked_cast(*origin_type); auto& inferred_decimal = checked_cast(*inferred_type); - if (!(origin_decimal.id() == inferred_decimal.id() && - origin_decimal.scale() == inferred_decimal.scale() && - origin_decimal.precision() <= inferred_decimal.precision())) { + ARROW_DCHECK_EQ(origin_decimal.scale(), inferred_decimal.scale()); + ARROW_DCHECK_LE(origin_decimal.precision(), inferred_decimal.precision()); + if (origin_type->id() != inferred_type->id()) { inferred->field = inferred->field->WithType(origin_type); modified = true; } diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index cbd9ba0b7ed..f9ec0c9ffa1 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -1135,9 +1135,8 @@ class PARQUET_EXPORT ArrowReaderProperties { } /// \brief Return whether to infer Decimal32/64 from parquet. /// - /// If `store_schema` is set, this flag does nothing and we always use decimal type in - /// metadata. If `store_schema` is not set, when this flag is true, we infer decimal - /// with small precision to Decimal32/Decimal64 instead of Decimal128. + /// When enabled, we will infer decimal with small precision to Decimal32/Decimal64 by + /// `smallest_decimal` instead of Decimal128 based on precision. bool smallest_decimal_enabled() const { return smallest_decimal_enabled_; } private: From 5f86a960eefa40025ef1a296bae4032e1bd48d9e Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Thu, 28 Aug 2025 11:20:45 +0800 Subject: [PATCH 09/16] adjust comments --- cpp/src/parquet/properties.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index f9ec0c9ffa1..da38d8e0e31 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -1135,8 +1135,8 @@ class PARQUET_EXPORT ArrowReaderProperties { } /// \brief Return whether to infer Decimal32/64 from parquet. /// - /// When enabled, we will infer decimal with small precision to Decimal32/Decimal64 by - /// `smallest_decimal` instead of Decimal128 based on precision. + /// When enabled, decimal type will be inferred as the smallest DecimalType which is + /// able to represent that precision; otherwise always inferred as Decimal128. bool smallest_decimal_enabled() const { return smallest_decimal_enabled_; } private: From f9f1fb9161f36d51ec480775776eabb91143da18 Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Thu, 28 Aug 2025 11:49:50 +0800 Subject: [PATCH 10/16] adjust comments --- cpp/src/parquet/properties.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index da38d8e0e31..8e6edeff949 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -1127,13 +1127,13 @@ class PARQUET_EXPORT ArrowReaderProperties { /// Return whether loading statistics as much as possible. bool should_load_statistics() const { return should_load_statistics_; } - /// \brief Set whether infer Decimal32/64 from parquet. + /// \brief Set whether to infer Decimal32/64 from Parquet decimal logical types. /// /// Default is false. void set_smallest_decimal_enabled(bool smallest_decimal_enable) { smallest_decimal_enabled_ = smallest_decimal_enable; } - /// \brief Return whether to infer Decimal32/64 from parquet. + /// \brief Set whether to infer Decimal32/64 from Parquet decimal logical types. /// /// When enabled, decimal type will be inferred as the smallest DecimalType which is /// able to represent that precision; otherwise always inferred as Decimal128. From b32234d2dbc83294e99e1f3aa3ef057053730f4f Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Thu, 28 Aug 2025 11:59:25 +0800 Subject: [PATCH 11/16] remove new include and default parameter in schema_internal.h --- cpp/src/parquet/arrow/reader_internal.cc | 6 ++++-- cpp/src/parquet/arrow/schema_internal.h | 7 ++----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 162b7691b45..f1866857951 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -159,7 +159,8 @@ static Status FromInt32Statistics(const Int32Statistics& statistics, const LogicalType& logical_type, std::shared_ptr<::arrow::Scalar>* min, std::shared_ptr<::arrow::Scalar>* max) { - ARROW_ASSIGN_OR_RAISE(auto type, FromInt32(logical_type)); + ARROW_ASSIGN_OR_RAISE(auto type, + FromInt32(logical_type, default_arrow_reader_properties())); switch (logical_type.type()) { case LogicalType::Type::INT: @@ -181,7 +182,8 @@ static Status FromInt64Statistics(const Int64Statistics& statistics, const LogicalType& logical_type, std::shared_ptr<::arrow::Scalar>* min, std::shared_ptr<::arrow::Scalar>* max) { - ARROW_ASSIGN_OR_RAISE(auto type, FromInt64(logical_type)); + ARROW_ASSIGN_OR_RAISE(auto type, + FromInt64(logical_type, default_arrow_reader_properties())); switch (logical_type.type()) { case LogicalType::Type::INT: diff --git a/cpp/src/parquet/arrow/schema_internal.h b/cpp/src/parquet/arrow/schema_internal.h index 81bf56b7a44..09ad891aad3 100644 --- a/cpp/src/parquet/arrow/schema_internal.h +++ b/cpp/src/parquet/arrow/schema_internal.h @@ -19,7 +19,6 @@ #include "arrow/result.h" #include "arrow/type_fwd.h" -#include "parquet/properties.h" #include "parquet/schema.h" namespace parquet::arrow { @@ -27,11 +26,9 @@ namespace parquet::arrow { using ::arrow::Result; Result> FromInt32( - const LogicalType& logical_type, - const ArrowReaderProperties& reader_properties = default_arrow_reader_properties()); + const LogicalType& logical_type, const ArrowReaderProperties& reader_properties); Result> FromInt64( - const LogicalType& logical_type, - const ArrowReaderProperties& reader_properties = default_arrow_reader_properties()); + const LogicalType& logical_type, const ArrowReaderProperties& reader_properties); Result> GetArrowType( Type::type physical_type, const LogicalType& logical_type, int type_length, From 3302392c517bc5770e925506703614ec5c143aaa Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Thu, 28 Aug 2025 20:32:36 +0800 Subject: [PATCH 12/16] fix review comments --- .../parquet/arrow/arrow_reader_writer_test.cc | 97 +++++-------------- cpp/src/parquet/arrow/reader_internal.cc | 10 +- cpp/src/parquet/arrow/schema.cc | 6 +- cpp/src/parquet/properties.h | 11 ++- 4 files changed, 41 insertions(+), 83 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 5629ac36512..1b3e0badfb1 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -3082,44 +3082,19 @@ TEST(ArrowReadWrite, NestedRequiredField) { /*row_group_size=*/8); } -TEST(ArrowReadWrite, Decimal32) { +TEST(ArrowReadWrite, Decimal) { using ::arrow::field; - auto type = ::arrow::decimal32(8, 4); - const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678", "-9999.9999", "9999.9999"])"; - auto array = ::arrow::ArrayFromJSON(type, json); - auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array}); - auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build(); - CheckSimpleRoundtrip(table, 2, props_store_schema); -} -TEST(ArrowReadWrite, Decimal64) { - using ::arrow::field; - - auto type = ::arrow::decimal64(8, 4); - - const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678", - "-9999.9999", "9999.9999"])"; - auto array = ::arrow::ArrayFromJSON(type, json); - auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array}); - auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build(); - CheckSimpleRoundtrip(table, 2, props_store_schema); -} - -TEST(ArrowReadWrite, Decimal256) { - using ::arrow::Decimal256; - using ::arrow::field; - - auto type = ::arrow::decimal256(8, 4); - - const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678", - "-9999.9999", "9999.9999"])"; - auto array = ::arrow::ArrayFromJSON(type, json); - auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array}); - auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build(); - CheckSimpleRoundtrip(table, 2, props_store_schema); + for (auto type : {::arrow::decimal32(8, 4), ::arrow::decimal64(8, 4), + ::arrow::decimal128(8, 4), ::arrow::decimal256(8, 4)}) { + auto array = ::arrow::ArrayFromJSON(type, json); + auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array}); + auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build(); + CheckSimpleRoundtrip(table, 2, props_store_schema); + } } TEST(ArrowReadWrite, DecimalStats) { @@ -5548,50 +5523,28 @@ class TestIntegerAnnotateSmallestDecimalTypeParquetIO if (values.type()->id() == out->type()->id()) { AssertArraysEqual(values, *out); } else { - auto& expected_values = values; - auto& read_values = *out; - ASSERT_EQ(expected_values.length(), read_values.length()); - ASSERT_EQ(expected_values.null_count(), read_values.null_count()); - - auto format_decimal = [](const Array& values, int64_t index) { - switch (values.type()->id()) { - case ::arrow::Type::DECIMAL32: - return static_cast(values).FormatValue(index); - case ::arrow::Type::DECIMAL64: - return static_cast(values).FormatValue(index); - case ::arrow::Type::DECIMAL128: - return static_cast(values).FormatValue( - index); - case ::arrow::Type::DECIMAL256: - return static_cast(values).FormatValue( - index); - default: - std::string err("Unexpected decimal type: " + values.type()->ToString()); - ADD_FAILURE() << err; - return err; - } - }; - - for (int64_t i = 0; i < expected_values.length(); ++i) { - ASSERT_EQ(expected_values.IsNull(i), read_values.IsNull(i)); - if (!expected_values.IsNull(i)) { - std::string expected_str = format_decimal(expected_values, i); - std::string read_str = format_decimal(read_values, i); - ASSERT_EQ(expected_str, read_str); - } - } + auto decimal_type = checked_pointer_cast<::arrow::DecimalType>(values.type()); + + ASSERT_OK_AND_ASSIGN( + const auto expected_values, + ::arrow::compute::Cast(values, ::arrow::decimal256(decimal_type->precision(), + decimal_type->scale()))); + ASSERT_OK_AND_ASSIGN( + const auto out_values, + ::arrow::compute::Cast(*out, ::arrow::decimal256(decimal_type->precision(), + decimal_type->scale()))); + + ASSERT_EQ(expected_values->length(), out_values->length()); + ASSERT_EQ(expected_values->null_count(), out_values->null_count()); + ASSERT_TRUE(expected_values->Equals(*out_values)); } } }; using SmallestDecimalTestTypes = ::testing::Types< - Decimal32WithPrecisionAndScale<1>, Decimal32WithPrecisionAndScale<5>, - Decimal32WithPrecisionAndScale<9>, Decimal64WithPrecisionAndScale<1>, - Decimal64WithPrecisionAndScale<5>, Decimal64WithPrecisionAndScale<10>, - Decimal64WithPrecisionAndScale<18>, Decimal128WithPrecisionAndScale<1>, - Decimal128WithPrecisionAndScale<5>, Decimal128WithPrecisionAndScale<10>, - Decimal128WithPrecisionAndScale<18>, Decimal256WithPrecisionAndScale<1>, - Decimal256WithPrecisionAndScale<5>, Decimal256WithPrecisionAndScale<10>, + Decimal32WithPrecisionAndScale<9>, Decimal64WithPrecisionAndScale<9>, + Decimal64WithPrecisionAndScale<18>, Decimal128WithPrecisionAndScale<9>, + Decimal128WithPrecisionAndScale<18>, Decimal256WithPrecisionAndScale<9>, Decimal256WithPrecisionAndScale<18>>; TYPED_TEST_SUITE(TestIntegerAnnotateSmallestDecimalTypeParquetIO, diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index f1866857951..b622e93e072 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -797,23 +797,23 @@ Status TransferDecimal(RecordReader* reader, MemoryPool* pool, } template -Status TransferDecimalTo(Type::type physical_type, Args... args) { +Status TransferDecimalTo(Type::type physical_type, Args&&... args) { switch (physical_type) { case ::parquet::Type::INT32: { auto fn = DecimalIntegerTransfer; - RETURN_NOT_OK(fn(args...)); + RETURN_NOT_OK(fn(std::forward(args)...)); } break; case ::parquet::Type::INT64: { auto fn = DecimalIntegerTransfer; - RETURN_NOT_OK(fn(args...)); + RETURN_NOT_OK(fn(std::forward(args)...)); } break; case ::parquet::Type::BYTE_ARRAY: { auto fn = TransferDecimal; - RETURN_NOT_OK(fn(args...)); + RETURN_NOT_OK(fn(std::forward(args)...)); } break; case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { auto fn = TransferDecimal; - RETURN_NOT_OK(fn(args...)); + RETURN_NOT_OK(fn(std::forward(args)...)); } break; default: return Status::Invalid( diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index c091d4d746d..293ae94b94d 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -1082,9 +1082,9 @@ Result ApplyOriginalStorageMetadata(const Field& origin_field, ::arrow::is_decimal(inferred_type->id())) { auto& origin_decimal = checked_cast(*origin_type); auto& inferred_decimal = checked_cast(*inferred_type); - ARROW_DCHECK_EQ(origin_decimal.scale(), inferred_decimal.scale()); - ARROW_DCHECK_LE(origin_decimal.precision(), inferred_decimal.precision()); - if (origin_type->id() != inferred_type->id()) { + if (origin_decimal.precision() == inferred_decimal.precision() && + origin_decimal.scale() == inferred_decimal.scale() && + origin_decimal.id() != inferred_decimal.id()) { inferred->field = inferred->field->WithType(origin_type); modified = true; } diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 8e6edeff949..0c1b1e5853a 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -1133,10 +1133,15 @@ class PARQUET_EXPORT ArrowReaderProperties { void set_smallest_decimal_enabled(bool smallest_decimal_enable) { smallest_decimal_enabled_ = smallest_decimal_enable; } - /// \brief Set whether to infer Decimal32/64 from Parquet decimal logical types. + /// \brief Whether to infer Decimal32/64 from Parquet decimal logical types. + /// + /// When enabled, Parquet decimal columns will be inferred as the smallest possible + /// Arrow Decimal type. + /// When disabled, Parquet decimal columns will be inferred as either Decimal128 or + /// Decimal256, but not Decimal32/64. /// - /// When enabled, decimal type will be inferred as the smallest DecimalType which is - /// able to represent that precision; otherwise always inferred as Decimal128. + /// Note: if an Arrow schema is found in the Parquet metadata, it will take priority and + /// this setting will be ignored. bool smallest_decimal_enabled() const { return smallest_decimal_enabled_; } private: From dab36753d4905120bbb05f743102986904a47585 Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Thu, 28 Aug 2025 20:40:54 +0800 Subject: [PATCH 13/16] fix review comments --- cpp/src/parquet/properties.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 0c1b1e5853a..5a1799c39d7 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -1129,7 +1129,8 @@ class PARQUET_EXPORT ArrowReaderProperties { /// \brief Set whether to infer Decimal32/64 from Parquet decimal logical types. /// - /// Default is false. + /// Default is false for compatibility, meaning that only Decimal128 and Decimal256 + /// can be inferred. void set_smallest_decimal_enabled(bool smallest_decimal_enable) { smallest_decimal_enabled_ = smallest_decimal_enable; } From 17bdac904abefaa50039808bf7eda924afe2bc8a Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Thu, 28 Aug 2025 21:07:57 +0800 Subject: [PATCH 14/16] modify parquet.rst --- docs/source/cpp/parquet.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/cpp/parquet.rst b/docs/source/cpp/parquet.rst index 880e84939b4..00f25378c8f 100644 --- a/docs/source/cpp/parquet.rst +++ b/docs/source/cpp/parquet.rst @@ -460,8 +460,8 @@ physical type. +-------------------+-----------------------------+------------------------------+-----------+ | INT | INT64 | Int64 / UInt64 | | +-------------------+-----------------------------+------------------------------+-----------+ -| DECIMAL | INT32 / INT64 / BYTE_ARRAY | Decimal128 / Decimal256 | \(2) | -| | / FIXED_LENGTH_BYTE_ARRAY | | | +| DECIMAL | INT32 / INT64 / BYTE_ARRAY | Decimal32/ Decimal64 / | \(2) | +| | / FIXED_LENGTH_BYTE_ARRAY | Decimal128 / Decimal256 | | +-------------------+-----------------------------+------------------------------+-----------+ | DATE | INT32 | Date32 | \(3) | +-------------------+-----------------------------+------------------------------+-----------+ From 327bcbc1f53c9524a518e74babb82c2ebee5e226 Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Tue, 2 Sep 2025 10:37:17 +0800 Subject: [PATCH 15/16] fix review --- docs/source/cpp/parquet.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/source/cpp/parquet.rst b/docs/source/cpp/parquet.rst index 00f25378c8f..c772c86485f 100644 --- a/docs/source/cpp/parquet.rst +++ b/docs/source/cpp/parquet.rst @@ -493,7 +493,8 @@ physical type. * \(1) On the write side, the Parquet physical type INT32 is generated. -* \(2) On the write side, a FIXED_LENGTH_BYTE_ARRAY is always emitted. +* \(2) On the write side, a FIXED_LENGTH_BYTE_ARRAY is always emitted + except if `store_decimal_as_integer` is set to true. * \(3) On the write side, an Arrow Date64 is also mapped to a Parquet DATE INT32. From d8c5503eb70ef8144c2b4cad10cd5f313a5d4dc0 Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Tue, 2 Sep 2025 21:07:56 +0800 Subject: [PATCH 16/16] fix tests --- docs/source/cpp/parquet.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/cpp/parquet.rst b/docs/source/cpp/parquet.rst index c772c86485f..cc118f00057 100644 --- a/docs/source/cpp/parquet.rst +++ b/docs/source/cpp/parquet.rst @@ -494,7 +494,7 @@ physical type. * \(1) On the write side, the Parquet physical type INT32 is generated. * \(2) On the write side, a FIXED_LENGTH_BYTE_ARRAY is always emitted - except if `store_decimal_as_integer` is set to true. + except if ``store_decimal_as_integer`` is set to true. * \(3) On the write side, an Arrow Date64 is also mapped to a Parquet DATE INT32.