diff --git a/cpp/src/arrow/compute/kernels/vector_hash.cc b/cpp/src/arrow/compute/kernels/vector_hash.cc index bd8cbdb0430..bfb8ab692e4 100644 --- a/cpp/src/arrow/compute/kernels/vector_hash.cc +++ b/cpp/src/arrow/compute/kernels/vector_hash.cc @@ -556,6 +556,7 @@ KernelInit GetHashInit(Type::type type_id) { case Type::DATE32: case Type::TIME32: case Type::INTERVAL_MONTHS: + case Type::DECIMAL32: return HashInit>; case Type::INT64: case Type::UINT64: @@ -565,6 +566,7 @@ KernelInit GetHashInit(Type::type type_id) { case Type::TIMESTAMP: case Type::DURATION: case Type::INTERVAL_DAY_TIME: + case Type::DECIMAL64: return HashInit>; case Type::BINARY: case Type::STRING: @@ -708,7 +710,7 @@ void AddHashKernels(VectorFunction* func, VectorKernel base, OutputType out_ty) DCHECK_OK(func->AddKernel(base)); } - for (auto t : {Type::DECIMAL128, Type::DECIMAL256}) { + for (auto t : {Type::DECIMAL32, Type::DECIMAL64, Type::DECIMAL128, Type::DECIMAL256}) { base.init = GetHashInit(t); base.signature = KernelSignature::Make({t}, out_ty); DCHECK_OK(func->AddKernel(base)); diff --git a/cpp/src/arrow/compute/kernels/vector_selection.cc b/cpp/src/arrow/compute/kernels/vector_selection.cc index 6c6f1b36b84..076c7d8d32d 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection.cc @@ -309,7 +309,8 @@ std::shared_ptr MakeIndicesNonZeroFunction(std::string name, AddKernels(NumericTypes()); AddKernels({boolean()}); - for (const auto& ty : {Type::DECIMAL128, Type::DECIMAL256}) { + for (const auto& ty : + {Type::DECIMAL32, Type::DECIMAL64, Type::DECIMAL128, Type::DECIMAL256}) { kernel.signature = KernelSignature::Make({ty}, uint64()); DCHECK_OK(func->AddKernel(kernel)); } diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index fd601b673c4..621a412a025 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -131,6 +131,8 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties( parquet_scan_options.arrow_reader_properties->cache_options()); arrow_properties.set_io_context( parquet_scan_options.arrow_reader_properties->io_context()); + arrow_properties.set_smallest_decimal_enabled( + parquet_scan_options.arrow_reader_properties->smallest_decimal_enabled()); arrow_properties.set_use_threads(options.use_threads); return arrow_properties; } diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index d5b9e3371d0..55d98fc000c 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -181,6 +181,14 @@ std::shared_ptr get_logical_type(const DataType& type) { static_cast(type); return get_logical_type(*dict_type.value_type()); } + case ArrowId::DECIMAL32: { + const auto& dec_type = static_cast(type); + return LogicalType::Decimal(dec_type.precision(), dec_type.scale()); + } + case ArrowId::DECIMAL64: { + const auto& dec_type = static_cast(type); + return LogicalType::Decimal(dec_type.precision(), dec_type.scale()); + } case ArrowId::DECIMAL128: { const auto& dec_type = static_cast(type); return LogicalType::Decimal(dec_type.precision(), dec_type.scale()); @@ -206,9 +214,11 @@ ParquetType::type get_physical_type(const DataType& type) { case ArrowId::INT16: case ArrowId::UINT32: case ArrowId::INT32: + case ArrowId::DECIMAL32: return ParquetType::INT32; case ArrowId::UINT64: case ArrowId::INT64: + case ArrowId::DECIMAL64: return ParquetType::INT64; case ArrowId::FLOAT: return ParquetType::FLOAT; @@ -439,17 +449,22 @@ void CheckConfiguredRoundtrip( } } -void DoSimpleRoundtrip(const std::shared_ptr& table, bool use_threads, - int64_t row_group_size, const std::vector& column_subset, - std::shared_ptr
* out, - const std::shared_ptr& arrow_properties = - default_arrow_writer_properties()) { +void DoSimpleRoundtrip( + const std::shared_ptr
& table, bool use_threads, int64_t row_group_size, + const std::vector& column_subset, std::shared_ptr
* out, + const std::shared_ptr& arrow_properties = + default_arrow_writer_properties(), + const ArrowReaderProperties& reader_properties = default_arrow_reader_properties()) { std::shared_ptr buffer; ASSERT_NO_FATAL_FAILURE( WriteTableToBuffer(table, row_group_size, arrow_properties, &buffer)); - ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared(buffer), - ::arrow::default_memory_pool())); + std::unique_ptr reader; + FileReaderBuilder builder; + ASSERT_OK(builder.Open(std::make_shared(buffer))); + ASSERT_OK(builder.properties(reader_properties) + ->memory_pool(::arrow::default_memory_pool()) + ->Build(&reader)); reader->set_use_threads(use_threads); if (column_subset.size() > 0) { @@ -464,18 +479,18 @@ void DoRoundTripWithBatches( const std::shared_ptr
& table, bool use_threads, int64_t row_group_size, const std::vector& column_subset, std::shared_ptr
* out, const std::shared_ptr& arrow_writer_properties = - default_arrow_writer_properties()) { + default_arrow_writer_properties(), + ArrowReaderProperties reader_properties = default_arrow_reader_properties()) { std::shared_ptr buffer; + reader_properties.set_batch_size(row_group_size - 1); ASSERT_NO_FATAL_FAILURE( WriteTableToBuffer(table, row_group_size, arrow_writer_properties, &buffer)); std::unique_ptr reader; FileReaderBuilder builder; ASSERT_OK_NO_THROW(builder.Open(std::make_shared(buffer))); - ArrowReaderProperties arrow_reader_properties; - arrow_reader_properties.set_batch_size(row_group_size - 1); ASSERT_OK_NO_THROW(builder.memory_pool(::arrow::default_memory_pool()) - ->properties(arrow_reader_properties) + ->properties(reader_properties) ->Build(&reader)); std::unique_ptr<::arrow::RecordBatchReader> batch_reader; if (column_subset.size() > 0) { @@ -496,20 +511,21 @@ void DoRoundTripWithBatches( void CheckSimpleRoundtrip( const std::shared_ptr
& table, int64_t row_group_size, const std::shared_ptr& arrow_writer_properties = - default_arrow_writer_properties()) { + default_arrow_writer_properties(), + const ArrowReaderProperties& reader_properties = default_arrow_reader_properties()) { std::shared_ptr
result; ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(table, false /* use_threads */, row_group_size, {}, &result, - arrow_writer_properties)); + arrow_writer_properties, reader_properties)); ::arrow::AssertSchemaEqual(*table->schema(), *result->schema(), /*check_metadata=*/false); ASSERT_OK(result->ValidateFull()); ::arrow::AssertTablesEqual(*table, *result, false); - ASSERT_NO_FATAL_FAILURE(DoRoundTripWithBatches(table, false /* use_threads */, - row_group_size, {}, &result, - arrow_writer_properties)); + ASSERT_NO_FATAL_FAILURE( + DoRoundTripWithBatches(table, false /* use_threads */, row_group_size, {}, &result, + arrow_writer_properties, reader_properties)); ::arrow::AssertSchemaEqual(*table->schema(), *result->schema(), /*check_metadata=*/false); ASSERT_OK(result->ValidateFull()); @@ -533,6 +549,8 @@ static std::shared_ptr MakeSimpleSchema(const DataType& type, case ::arrow::Type::HALF_FLOAT: byte_width = sizeof(::arrow::HalfFloatType::c_type); break; + case ::arrow::Type::DECIMAL32: + case ::arrow::Type::DECIMAL64: case ::arrow::Type::DECIMAL128: case ::arrow::Type::DECIMAL256: { const auto& decimal_type = static_cast(values_type); @@ -548,6 +566,8 @@ static std::shared_ptr MakeSimpleSchema(const DataType& type, case ::arrow::Type::HALF_FLOAT: byte_width = sizeof(::arrow::HalfFloatType::c_type); break; + case ::arrow::Type::DECIMAL32: + case ::arrow::Type::DECIMAL64: case ::arrow::Type::DECIMAL128: case ::arrow::Type::DECIMAL256: { const auto& decimal_type = static_cast(type); @@ -620,6 +640,30 @@ class ParquetIOTestBase : public ::testing::Test { return ParquetFileWriter::Open(sink_, schema); } + template + ::arrow::enable_if_t< + !std::is_base_of::value, + ArrowReaderProperties> + ReaderPropertiesFromArrowType() { + return default_arrow_reader_properties(); + } + + template + ::arrow::enable_if_t< + std::is_base_of::value, + ArrowReaderProperties> + ReaderPropertiesFromArrowType() { + auto properties = default_arrow_reader_properties(); + properties.set_smallest_decimal_enabled(smallest_decimal_enabled); + return properties; + } + + template + void ReaderFromSinkTemplate(std::unique_ptr* out) { + ReaderFromSink(out, this->template ReaderPropertiesFromArrowType()); + } + void ReaderFromSink( std::unique_ptr* out, const ArrowReaderProperties& properties = default_arrow_reader_properties()) { @@ -646,11 +690,19 @@ class ParquetIOTestBase : public ::testing::Test { ASSERT_OK((*out)->ValidateFull()); } - void ReadAndCheckSingleColumnFile(const Array& values) { + template + void ReadAndCheckSingleColumnFileTemplate(const Array& values) { + ReadAndCheckSingleColumnFile( + values, this->template ReaderPropertiesFromArrowType()); + } + + void ReadAndCheckSingleColumnFile( + const Array& values, + const ArrowReaderProperties& properties = default_arrow_reader_properties()) { std::shared_ptr out; std::unique_ptr reader; - ReaderFromSink(&reader); + ReaderFromSink(&reader, properties); ReadSingleColumnFile(std::move(reader), &out); AssertArraysEqual(values, *out); @@ -707,10 +759,11 @@ class ParquetIOTestBase : public ::testing::Test { *out = MakeSimpleTable(lists, true /* nullable_lists */); } + template void ReadAndCheckSingleColumnTable(const std::shared_ptr& values) { std::shared_ptr<::arrow::Table> out; std::unique_ptr reader; - ReaderFromSink(&reader); + ReaderFromSinkTemplate(&reader); ReadTableFromFile(std::move(reader), &out); ASSERT_EQ(1, out->num_columns()); ASSERT_EQ(values->length(), out->num_rows()); @@ -722,8 +775,15 @@ class ParquetIOTestBase : public ::testing::Test { AssertArraysEqual(*values, *result); } - void CheckRoundTrip(const std::shared_ptr
& table) { - CheckSimpleRoundtrip(table, table->num_rows()); + template + void CheckRoundTripTemplate(const std::shared_ptr
& table) { + CheckRoundTrip(table, this->template ReaderPropertiesFromArrowType()); + } + + void CheckRoundTrip(const std::shared_ptr
& table, + const ArrowReaderProperties& reader_properties) { + CheckSimpleRoundtrip(table, table->num_rows(), default_arrow_writer_properties(), + reader_properties); } template @@ -753,9 +813,10 @@ class ParquetIOTestBase : public ::testing::Test { class TestReadDecimals : public ParquetIOTestBase { public: - void CheckReadFromByteArrays(const std::shared_ptr& logical_type, - const std::vector>& values, - const Array& expected) { + void CheckReadFromByteArrays( + const std::shared_ptr& logical_type, + const std::vector>& values, const Array& expected, + const ArrowReaderProperties& properties = default_arrow_reader_properties()) { std::vector byte_arrays(values.size()); std::transform(values.begin(), values.end(), byte_arrays.begin(), [](const std::vector& bytes) { @@ -776,13 +837,12 @@ class TestReadDecimals : public ParquetIOTestBase { column_writer->Close(); file_writer->Close(); - ReadAndCheckSingleColumnFile(expected); + ReadAndCheckSingleColumnFile(expected, properties); } }; // The Decimal roundtrip tests always go through the FixedLenByteArray path, // check the ByteArray case manually. - TEST_F(TestReadDecimals, Decimal128ByteArray) { const std::vector> big_endian_decimals = { // 123456 @@ -814,6 +874,73 @@ TEST_F(TestReadDecimals, Decimal256ByteArray) { CheckReadFromByteArrays(LogicalType::Decimal(40, 3), big_endian_decimals, *expected); } +TEST_F(TestReadDecimals, DecimalByteArraySmallestDecimal) { + const std::vector>> big_endian_decimals_vector = { + { + // 123456 + {1, 226, 64}, + // 987654 + {15, 18, 6}, + // -123456 + {255, 254, 29, 192}, + }, + { + // 123456 + {1, 226, 64}, + // 987654 + {15, 18, 6}, + // -123456 + {255, 254, 29, 192}, + // -123456 + {255, 255, 255, 255, 255, 254, 29, 192}, + }, + { + // 123456 + {1, 226, 64}, + // 987654 + {15, 18, 6}, + // -123456 + {255, 254, 29, 192}, + // -123456 + {255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 254, 29, 192}, + }, + { + // 123456 + {1, 226, 64}, + // 987654 + {15, 18, 6}, + // -123456 + {255, 254, 29, 192}, + // -123456 + {255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 255, 255, 255, 255, 255, 255, 255, 254, 29, 192}, + }}; + + const std::vector, std::shared_ptr>> + expected_vector = { + {ArrayFromJSON(::arrow::decimal32(6, 3), + R"(["123.456", "987.654", "-123.456"])"), + LogicalType::Decimal(6, 3)}, + {ArrayFromJSON(::arrow::decimal64(16, 3), + R"(["123.456", "987.654", "-123.456", "-123.456"])"), + LogicalType::Decimal(16, 3)}, + {ArrayFromJSON(::arrow::decimal128(20, 3), + R"(["123.456", "987.654", "-123.456", "-123.456"])"), + LogicalType::Decimal(20, 3)}, + {ArrayFromJSON(::arrow::decimal256(40, 3), + R"(["123.456", "987.654", "-123.456", "-123.456"])"), + LogicalType::Decimal(40, 3)}}; + + ArrowReaderProperties reader_props = default_arrow_reader_properties(); + reader_props.set_smallest_decimal_enabled(true); + for (size_t i = 0; i < expected_vector.size(); ++i) { + CheckReadFromByteArrays(expected_vector.at(i).second, + big_endian_decimals_vector.at(i), + *expected_vector.at(i).first, reader_props); + } +} + template class TestParquetIO : public ParquetIOTestBase { public: @@ -858,6 +985,9 @@ typedef ::testing::Types< ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType, ::arrow::BinaryType, ::arrow::FixedSizeBinaryType, ::arrow::HalfFloatType, + Decimal32WithPrecisionAndScale<1, true>, Decimal32WithPrecisionAndScale<5, true>, + Decimal64WithPrecisionAndScale<10, true>, Decimal64WithPrecisionAndScale<18, true>, + Decimal128WithPrecisionAndScale<19, true>, Decimal256WithPrecisionAndScale<39, true>, Decimal128WithPrecisionAndScale<1>, Decimal128WithPrecisionAndScale<5>, Decimal128WithPrecisionAndScale<10>, Decimal128WithPrecisionAndScale<19>, Decimal128WithPrecisionAndScale<23>, Decimal128WithPrecisionAndScale<27>, @@ -875,7 +1005,8 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) { MakeSimpleSchema(*values->type(), Repetition::REQUIRED); ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, values)); - ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values)); + ASSERT_NO_FATAL_FAILURE( + this->template ReadAndCheckSingleColumnFileTemplate(*values)); } TYPED_TEST(TestParquetIO, ZeroChunksTable) { @@ -908,7 +1039,7 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { std::shared_ptr
out; std::unique_ptr reader; - ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); + ASSERT_NO_FATAL_FAILURE(this->template ReaderFromSinkTemplate(&reader)); ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out)); ASSERT_EQ(1, out->num_columns()); EXPECT_EQ(table->num_rows(), out->num_rows()); @@ -929,7 +1060,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { MakeSimpleSchema(*values->type(), Repetition::OPTIONAL); ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, values)); - ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values)); + ASSERT_NO_FATAL_FAILURE( + this->template ReadAndCheckSingleColumnFileTemplate(*values)); } TYPED_TEST(TestParquetIO, SingleColumnOptionalDictionaryWrite) { @@ -954,7 +1086,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalDictionaryWrite) { MakeSimpleSchema(*dict_values->type(), Repetition::OPTIONAL); ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, dict_values)); - ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values)); + ASSERT_NO_FATAL_FAILURE( + this->template ReadAndCheckSingleColumnFileTemplate(*values)); } TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) { @@ -965,12 +1098,14 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) { std::shared_ptr sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE); ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, sliced_values)); - ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*sliced_values)); + ASSERT_NO_FATAL_FAILURE( + this->template ReadAndCheckSingleColumnFileTemplate(*sliced_values)); // Slice offset 1 higher sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE); ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, sliced_values)); - ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*sliced_values)); + ASSERT_NO_FATAL_FAILURE( + this->template ReadAndCheckSingleColumnFileTemplate(*sliced_values)); } TYPED_TEST(TestParquetIO, SingleColumnOptionalSliceWrite) { @@ -981,12 +1116,14 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalSliceWrite) { std::shared_ptr sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE); ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, sliced_values)); - ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*sliced_values)); + ASSERT_NO_FATAL_FAILURE( + this->template ReadAndCheckSingleColumnFileTemplate(*sliced_values)); // Slice offset 1 higher, thus different null bitmap. sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE); ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, sliced_values)); - ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*sliced_values)); + ASSERT_NO_FATAL_FAILURE( + this->template ReadAndCheckSingleColumnFileTemplate(*sliced_values)); } TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { @@ -995,44 +1132,44 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); std::shared_ptr
table = MakeSimpleTable(values, true); - ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); + ASSERT_NO_FATAL_FAILURE(this->template CheckRoundTripTemplate(table)); } TYPED_TEST(TestParquetIO, SingleEmptyListsColumnReadWrite) { std::shared_ptr
table; ASSERT_NO_FATAL_FAILURE(this->PrepareEmptyListsTable(SMALL_SIZE, &table)); - ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); + ASSERT_NO_FATAL_FAILURE(this->template CheckRoundTripTemplate(table)); } TYPED_TEST(TestParquetIO, SingleNullableListNullableColumnReadWrite) { std::shared_ptr
table; this->PrepareListTable(SMALL_SIZE, true, true, 10, &table); - this->CheckRoundTrip(table); + this->template CheckRoundTripTemplate(table); } TYPED_TEST(TestParquetIO, SingleRequiredListNullableColumnReadWrite) { std::shared_ptr
table; ASSERT_NO_FATAL_FAILURE(this->PrepareListTable(SMALL_SIZE, false, true, 10, &table)); - ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); + ASSERT_NO_FATAL_FAILURE(this->template CheckRoundTripTemplate(table)); } TYPED_TEST(TestParquetIO, SingleNullableListRequiredColumnReadWrite) { std::shared_ptr
table; ASSERT_NO_FATAL_FAILURE(this->PrepareListTable(SMALL_SIZE, true, false, 10, &table)); - ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); + ASSERT_NO_FATAL_FAILURE(this->template CheckRoundTripTemplate(table)); } TYPED_TEST(TestParquetIO, SingleRequiredListRequiredColumnReadWrite) { std::shared_ptr
table; ASSERT_NO_FATAL_FAILURE(this->PrepareListTable(SMALL_SIZE, false, false, 0, &table)); - ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); + ASSERT_NO_FATAL_FAILURE(this->template CheckRoundTripTemplate(table)); } TYPED_TEST(TestParquetIO, SingleNullableListRequiredListRequiredColumnReadWrite) { std::shared_ptr
table; ASSERT_NO_FATAL_FAILURE( this->PrepareListOfListTable(SMALL_SIZE, true, false, false, 0, &table)); - ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); + ASSERT_NO_FATAL_FAILURE(this->template CheckRoundTripTemplate(table)); } TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { @@ -1059,7 +1196,8 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { } ASSERT_OK_NO_THROW(writer->Close()); - ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values)); + ASSERT_NO_FATAL_FAILURE( + this->template ReadAndCheckSingleColumnFileTemplate(*values)); } TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) { @@ -1071,7 +1209,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) { ASSERT_OK_NO_THROW(WriteTable(*table, default_memory_pool(), this->sink_, 512, default_writer_properties())); - ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values)); + ASSERT_NO_FATAL_FAILURE( + this->template ReadAndCheckSingleColumnTable(values)); } TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) { @@ -1081,12 +1220,14 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) { this->ResetSink(); auto buffer = AllocateBuffer(); + auto reader_properties = this->template ReaderPropertiesFromArrowType(); { // BufferOutputStream closed on gc auto arrow_sink_ = std::make_shared<::arrow::io::BufferOutputStream>(buffer); ASSERT_OK_NO_THROW(WriteTable(*table, default_memory_pool(), arrow_sink_, 512, - default_writer_properties())); + default_writer_properties(), + default_arrow_writer_properties())); // XXX: Remove this after ARROW-455 completed ASSERT_OK(arrow_sink_->Close()); @@ -1096,7 +1237,13 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) { auto source = std::make_shared(pbuffer); std::shared_ptr<::arrow::Table> out; - ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(source, ::arrow::default_memory_pool())); + std::unique_ptr reader; + FileReaderBuilder builder; + ASSERT_OK(builder.Open(source)); + ASSERT_OK(builder.properties(reader_properties) + ->memory_pool(::arrow::default_memory_pool()) + ->Build(&reader)); + ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out)); ASSERT_EQ(1, out->num_columns()); ASSERT_EQ(values->length(), out->num_rows()); @@ -1132,7 +1279,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { } ASSERT_OK_NO_THROW(writer->Close()); - ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values)); + ASSERT_NO_FATAL_FAILURE( + this->template ReadAndCheckSingleColumnFileTemplate(*values)); } TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { @@ -1145,7 +1293,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512, default_writer_properties())); - ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values)); + ASSERT_NO_FATAL_FAILURE( + this->template ReadAndCheckSingleColumnTable(values)); } TYPED_TEST(TestParquetIO, FileMetaDataWrite) { @@ -1186,7 +1335,7 @@ TYPED_TEST(TestParquetIO, CheckIterativeColumnRead) { values->length(), default_writer_properties())); std::unique_ptr reader; - this->ReaderFromSink(&reader); + this->template ReaderFromSinkTemplate(&reader); std::unique_ptr column_reader; ASSERT_OK_NO_THROW(reader->GetColumn(0, &column_reader)); ASSERT_NE(nullptr, column_reader.get()); @@ -1279,7 +1428,8 @@ TEST_F(TestUInt32ParquetIO, Parquet_2_6_Compatibility) { ->build(); ASSERT_OK_NO_THROW( WriteTable(*table, default_memory_pool(), this->sink_, 512, properties)); - ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values)); + ASSERT_NO_FATAL_FAILURE( + this->template ReadAndCheckSingleColumnTable<::arrow::UInt32Type>(values)); } using TestDurationParquetIO = TestParquetIO<::arrow::DurationType>; @@ -5290,13 +5440,17 @@ class TestIntegerAnnotateDecimalTypeParquetIO : public TestParquetIO { void ReadAndCheckSingleDecimalColumnFile(const Array& values) { std::shared_ptr out; std::unique_ptr reader; - this->ReaderFromSink(&reader); + auto reader_properties = this->template ReaderPropertiesFromArrowType(); + this->ReaderFromSink(&reader, reader_properties); this->ReadSingleColumnFile(std::move(reader), &out); - // Reader always read values as DECIMAL128 type - ASSERT_EQ(out->type()->id(), ::arrow::Type::DECIMAL128); + auto expected_type_id = reader_properties.smallest_decimal_enabled() + ? TestType::type_id + // Reader always read values as DECIMAL128 type + : ::arrow::Type::DECIMAL128; + ASSERT_EQ(out->type()->id(), expected_type_id); - if (values.type()->id() == ::arrow::Type::DECIMAL128) { + if (values.type()->id() == expected_type_id) { AssertArraysEqual(values, *out); } else { auto& expected_values = dynamic_cast(values); @@ -5316,6 +5470,9 @@ class TestIntegerAnnotateDecimalTypeParquetIO : public TestParquetIO { }; typedef ::testing::Types< + Decimal32WithPrecisionAndScale<1, true>, Decimal32WithPrecisionAndScale<5, true>, + Decimal64WithPrecisionAndScale<10, true>, Decimal64WithPrecisionAndScale<18, true>, + Decimal128WithPrecisionAndScale<19, true>, Decimal256WithPrecisionAndScale<39, true>, Decimal128WithPrecisionAndScale<1>, Decimal128WithPrecisionAndScale<5>, Decimal128WithPrecisionAndScale<10>, Decimal128WithPrecisionAndScale<18>, Decimal256WithPrecisionAndScale<1>, Decimal256WithPrecisionAndScale<5>, @@ -5348,7 +5505,7 @@ class TestBufferedParquetIO : public TestParquetIO { SchemaDescriptor descriptor; ASSERT_NO_THROW(descriptor.Init(schema)); std::shared_ptr<::arrow::Schema> arrow_schema; - ArrowReaderProperties props; + auto props = this->template ReaderPropertiesFromArrowType(); ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema)); std::unique_ptr writer; @@ -5373,7 +5530,8 @@ class TestBufferedParquetIO : public TestParquetIO { std::shared_ptr out; std::unique_ptr reader; - this->ReaderFromSink(&reader); + auto props = this->template ReaderPropertiesFromArrowType(); + this->ReaderFromSink(&reader, props); ASSERT_EQ(num_row_groups, reader->num_row_groups()); this->ReadSingleColumnFile(std::move(reader), &out); @@ -5384,7 +5542,8 @@ class TestBufferedParquetIO : public TestParquetIO { int num_row_groups) { std::shared_ptr<::arrow::Table> out; std::unique_ptr reader; - this->ReaderFromSink(&reader); + auto props = this->template ReaderPropertiesFromArrowType(); + this->ReaderFromSink(&reader, props); ASSERT_EQ(num_row_groups, reader->num_row_groups()); this->ReadTableFromFile(std::move(reader), &out); diff --git a/cpp/src/parquet/arrow/arrow_schema_test.cc b/cpp/src/parquet/arrow/arrow_schema_test.cc index 81d29fc7e41..10d1141cebb 100644 --- a/cpp/src/parquet/arrow/arrow_schema_test.cc +++ b/cpp/src/parquet/arrow/arrow_schema_test.cc @@ -268,6 +268,43 @@ TEST_F(TestConvertParquetSchema, ParquetAnnotatedFields) { ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); } +TEST_F(TestConvertParquetSchema, ParquetAnnotatedFieldsSmallestDecimal) { + struct FieldConstructionArguments { + std::string name; + std::shared_ptr logical_type; + parquet::Type::type physical_type; + int physical_length; + std::shared_ptr<::arrow::DataType> datatype; + }; + + std::vector cases = { + {"decimal(8, 2)", LogicalType::Decimal(8, 2), ParquetType::INT32, -1, + ::arrow::decimal32(8, 2)}, + {"decimal(16, 4)", LogicalType::Decimal(16, 4), ParquetType::INT64, -1, + ::arrow::decimal64(16, 4)}, + {"decimal(32, 8)", LogicalType::Decimal(32, 8), ParquetType::FIXED_LEN_BYTE_ARRAY, + 16, ::arrow::decimal128(32, 8)}, + {"decimal(73, 38)", LogicalType::Decimal(73, 38), ParquetType::FIXED_LEN_BYTE_ARRAY, + 31, ::arrow::decimal256(73, 38)}, + }; + + std::vector parquet_fields; + std::vector> arrow_fields; + + for (const FieldConstructionArguments& c : cases) { + parquet_fields.push_back(PrimitiveNode::Make(c.name, Repetition::OPTIONAL, + c.logical_type, c.physical_type, + c.physical_length)); + arrow_fields.push_back(::arrow::field(c.name, c.datatype)); + } + + auto reader_props = ArrowReaderProperties(); + reader_props.set_smallest_decimal_enabled(true); + ASSERT_OK(ConvertSchema(parquet_fields, nullptr, reader_props)); + auto arrow_schema = ::arrow::schema(arrow_fields); + ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); +} + TEST_F(TestConvertParquetSchema, DuplicateFieldNames) { std::vector parquet_fields; std::vector> arrow_fields; @@ -354,6 +391,42 @@ TEST_F(TestConvertParquetSchema, ParquetFlatDecimals) { ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); } +TEST_F(TestConvertParquetSchema, ParquetSmallestDecimals) { + std::vector parquet_fields; + std::vector> arrow_fields; + + parquet_fields.push_back(PrimitiveNode::Make("flba-decimal", Repetition::OPTIONAL, + ParquetType::FIXED_LEN_BYTE_ARRAY, + ConvertedType::DECIMAL, 4, 8, 4)); + arrow_fields.push_back( + ::arrow::field("flba-decimal", std::make_shared<::arrow::Decimal32Type>(8, 4))); + + parquet_fields.push_back(PrimitiveNode::Make("binary-decimal", Repetition::OPTIONAL, + ParquetType::BYTE_ARRAY, + ConvertedType::DECIMAL, -1, 18, 4)); + arrow_fields.push_back( + ::arrow::field("binary-decimal", std::make_shared<::arrow::Decimal64Type>(18, 4))); + + parquet_fields.push_back(PrimitiveNode::Make("int32-decimal", Repetition::OPTIONAL, + ParquetType::INT32, ConvertedType::DECIMAL, + -1, 38, 4)); + arrow_fields.push_back( + ::arrow::field("int32-decimal", std::make_shared<::arrow::Decimal128Type>(38, 4))); + + parquet_fields.push_back(PrimitiveNode::Make("int64-decimal", Repetition::OPTIONAL, + ParquetType::INT64, ConvertedType::DECIMAL, + -1, 48, 4)); + arrow_fields.push_back( + ::arrow::field("int64-decimal", std::make_shared<::arrow::Decimal256Type>(48, 4))); + + auto arrow_schema = ::arrow::schema(arrow_fields); + auto reader_props = ArrowReaderProperties(); + reader_props.set_smallest_decimal_enabled(true); + ASSERT_OK(ConvertSchema(parquet_fields, nullptr, reader_props)); + + ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); +} + TEST_F(TestConvertParquetSchema, ParquetMaps) { std::vector parquet_fields; std::vector> arrow_fields; @@ -1157,6 +1230,52 @@ TEST_F(TestConvertArrowSchema, ArrowFields) { // ASSERT_NO_FATAL_FAILURE(); } +TEST_F(TestConvertArrowSchema, ArrowFieldsStoreSchema) { + struct FieldConstructionArguments { + std::string name; + std::shared_ptr<::arrow::DataType> datatype; + std::shared_ptr logical_type; + parquet::Type::type physical_type; + int physical_length; + }; + + std::vector cases = { + {"decimal(1, 0)", ::arrow::decimal128(1, 0), LogicalType::Decimal(1, 0), + ParquetType::FIXED_LEN_BYTE_ARRAY, 1}, + {"decimal(8, 2)", ::arrow::decimal128(8, 2), LogicalType::Decimal(8, 2), + ParquetType::FIXED_LEN_BYTE_ARRAY, 4}, + {"decimal(16, 4)", ::arrow::decimal128(16, 4), LogicalType::Decimal(16, 4), + ParquetType::FIXED_LEN_BYTE_ARRAY, 7}, + {"decimal(32, 8)", ::arrow::decimal128(32, 8), LogicalType::Decimal(32, 8), + ParquetType::FIXED_LEN_BYTE_ARRAY, 14}, + {"decimal(1, 0)", ::arrow::decimal32(1, 0), LogicalType::Decimal(1, 0), + ParquetType::FIXED_LEN_BYTE_ARRAY, 1}, + {"decimal(8, 2)", ::arrow::decimal32(8, 2), LogicalType::Decimal(8, 2), + ParquetType::FIXED_LEN_BYTE_ARRAY, 4}, + {"decimal(16, 4)", ::arrow::decimal64(16, 4), LogicalType::Decimal(16, 4), + ParquetType::FIXED_LEN_BYTE_ARRAY, 7}, + {"decimal(32, 8)", ::arrow::decimal128(32, 8), LogicalType::Decimal(32, 8), + ParquetType::FIXED_LEN_BYTE_ARRAY, 14}, + {"decimal(73, 38)", ::arrow::decimal256(73, 38), LogicalType::Decimal(73, 38), + ParquetType::FIXED_LEN_BYTE_ARRAY, 31}}; + + std::vector> arrow_fields; + std::vector parquet_fields; + + for (const FieldConstructionArguments& c : cases) { + arrow_fields.push_back(::arrow::field(c.name, c.datatype, false)); + parquet_fields.push_back(PrimitiveNode::Make(c.name, Repetition::REQUIRED, + c.logical_type, c.physical_type, + c.physical_length)); + } + + auto writer_props = ::parquet::default_arrow_writer_properties(); + writer_props->store_schema(); + ASSERT_OK(ConvertSchema(arrow_fields, writer_props)); + CheckFlatSchema(parquet_fields); + // ASSERT_NO_FATAL_FAILURE(); +} + TEST_F(TestConvertArrowSchema, ArrowNonconvertibleFields) { struct FieldConstructionArguments { std::string name; diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 6a2519c70a0..8d328553640 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -28,6 +28,7 @@ #include #include "arrow/array.h" +#include "arrow/array/array_decimal.h" #include "arrow/compute/api.h" #include "arrow/datum.h" #include "arrow/io/memory.h" @@ -37,6 +38,7 @@ #include "arrow/status.h" #include "arrow/table.h" #include "arrow/type.h" +#include "arrow/type_fwd.h" #include "arrow/type_traits.h" #include "arrow/util/base64.h" #include "arrow/util/bit_util.h" @@ -69,6 +71,12 @@ using arrow::Decimal128Type; using arrow::Decimal256; using arrow::Decimal256Array; using arrow::Decimal256Type; +using arrow::Decimal32; +using arrow::Decimal32Array; +using arrow::Decimal32Type; +using arrow::Decimal64; +using arrow::Decimal64Array; +using arrow::Decimal64Type; using arrow::Field; using arrow::Int32Array; using arrow::ListArray; @@ -590,7 +598,8 @@ Status TransferBinary(RecordReader* reader, MemoryPool* pool, } // ---------------------------------------------------------------------- -// INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128 || Decimal256 +// INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY +// -> Decimal32 || Decimal64 || Decimal128 || Decimal256 template Status RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width, @@ -603,6 +612,16 @@ Status RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width, template struct DecimalTypeTrait; +template <> +struct DecimalTypeTrait<::arrow::Decimal32Array> { + using value = ::arrow::Decimal32; +}; + +template <> +struct DecimalTypeTrait<::arrow::Decimal64Array> { + using value = ::arrow::Decimal64; +}; + template <> struct DecimalTypeTrait<::arrow::Decimal128Array> { using value = ::arrow::Decimal128; @@ -721,7 +740,7 @@ struct DecimalConverter { } }; -/// \brief Convert an Int32 or Int64 array into a Decimal128Array +/// \brief Convert an Int32 or Int64 array into a Decimal32/64/128/256Array /// The parquet spec allows systems to write decimals in int32, int64 if the values are /// small enough to fit in less 4 bytes or less than 8 bytes, respectively. /// This function implements the conversion from int32 and int64 arrays to decimal arrays. @@ -731,9 +750,11 @@ template < std::is_same::value>> static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, const std::shared_ptr& field, Datum* out) { - // Decimal128 and Decimal256 are only Arrow constructs. Parquet does not + // Decimal32 and Decimal64 are only Arrow constructs. Parquet does not // specifically distinguish between decimal byte widths. - DCHECK(field->type()->id() == ::arrow::Type::DECIMAL128 || + DCHECK(field->type()->id() == ::arrow::Type::DECIMAL32 || + field->type()->id() == ::arrow::Type::DECIMAL64 || + field->type()->id() == ::arrow::Type::DECIMAL128 || field->type()->id() == ::arrow::Type::DECIMAL256); const int64_t length = reader->values_written(); @@ -757,7 +778,13 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, // sign/zero extend int32_t values, otherwise a no-op const auto value = static_cast(values[i]); - if constexpr (std::is_same_v) { + if constexpr (std::is_same_v) { + ::arrow::Decimal32 decimal(value); + decimal.ToBytes(out_ptr); + } else if constexpr (std::is_same_v) { + ::arrow::Decimal64 decimal(value); + decimal.ToBytes(out_ptr); + } else if constexpr (std::is_same_v) { ::arrow::Decimal128 decimal(value); decimal.ToBytes(out_ptr); } else { @@ -900,6 +927,50 @@ Status TransferColumnData(RecordReader* reader, } RETURN_NOT_OK(TransferHalfFloat(reader, pool, value_field, &result)); } break; + case ::arrow::Type::DECIMAL32: { + switch (descr->physical_type()) { + case ::parquet::Type::INT32: { + auto fn = DecimalIntegerTransfer; + RETURN_NOT_OK(fn(reader, pool, value_field, &result)); + } break; + case ::parquet::Type::BYTE_ARRAY: { + auto fn = &TransferDecimal; + RETURN_NOT_OK(fn(reader, pool, value_field, &result)); + } break; + case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { + auto fn = &TransferDecimal; + RETURN_NOT_OK(fn(reader, pool, value_field, &result)); + } break; + default: + return Status::Invalid( + "Physical type for decimal32 must be int32, byte array, or fixed length " + "binary"); + } + } break; + case ::arrow::Type::DECIMAL64: { + switch (descr->physical_type()) { + case ::parquet::Type::INT32: { + auto fn = DecimalIntegerTransfer; + RETURN_NOT_OK(fn(reader, pool, value_field, &result)); + } break; + case ::parquet::Type::INT64: { + auto fn = DecimalIntegerTransfer; + RETURN_NOT_OK(fn(reader, pool, value_field, &result)); + } break; + case ::parquet::Type::BYTE_ARRAY: { + auto fn = &TransferDecimal; + RETURN_NOT_OK(fn(reader, pool, value_field, &result)); + } break; + case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { + auto fn = &TransferDecimal; + RETURN_NOT_OK(fn(reader, pool, value_field, &result)); + } break; + default: + return Status::Invalid( + "Physical type for decimal64 must be int32, int64, byte array, or fixed " + "length binary"); + } + } break; case ::arrow::Type::DECIMAL128: { switch (descr->physical_type()) { case ::parquet::Type::INT32: { @@ -907,7 +978,7 @@ Status TransferColumnData(RecordReader* reader, RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; case ::parquet::Type::INT64: { - auto fn = &DecimalIntegerTransfer; + auto fn = DecimalIntegerTransfer; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; case ::parquet::Type::BYTE_ARRAY: { @@ -924,14 +995,14 @@ Status TransferColumnData(RecordReader* reader, "length binary"); } } break; - case ::arrow::Type::DECIMAL256: + case ::arrow::Type::DECIMAL256: { switch (descr->physical_type()) { case ::parquet::Type::INT32: { auto fn = DecimalIntegerTransfer; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; case ::parquet::Type::INT64: { - auto fn = &DecimalIntegerTransfer; + auto fn = DecimalIntegerTransfer; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; case ::parquet::Type::BYTE_ARRAY: { @@ -947,8 +1018,7 @@ Status TransferColumnData(RecordReader* reader, "Physical type for decimal256 must be int32, int64, byte array, or fixed " "length binary"); } - break; - + } break; case ::arrow::Type::TIMESTAMP: { const ::arrow::TimestampType& timestamp_type = checked_cast<::arrow::TimestampType&>(*value_field->type()); diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index bf46e183ae1..2f3bbfd061b 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -355,13 +355,15 @@ Status FieldToNode(const std::string& name, const std::shared_ptr& field, static_cast(*field->type()); length = fixed_size_binary_type.byte_width(); } break; + case ArrowTypeId::DECIMAL32: + case ArrowTypeId::DECIMAL64: case ArrowTypeId::DECIMAL128: case ArrowTypeId::DECIMAL256: { const auto& decimal_type = static_cast(*field->type()); precision = decimal_type.precision(); scale = decimal_type.scale(); if (properties.store_decimal_as_integer() && 1 <= precision && precision <= 18) { - type = precision <= 9 ? ParquetType ::INT32 : ParquetType ::INT64; + type = precision <= 9 ? ParquetType::INT32 : ParquetType::INT64; } else { type = ParquetType::FIXED_LEN_BYTE_ARRAY; length = DecimalType::DecimalSize(precision); diff --git a/cpp/src/parquet/arrow/schema_internal.cc b/cpp/src/parquet/arrow/schema_internal.cc index 004f42ea869..cdde2e576fb 100644 --- a/cpp/src/parquet/arrow/schema_internal.cc +++ b/cpp/src/parquet/arrow/schema_internal.cc @@ -32,12 +32,17 @@ using ::arrow::Result; using ::arrow::Status; using ::arrow::internal::checked_cast; -Result> MakeArrowDecimal(const LogicalType& logical_type) { +Result> MakeArrowDecimal(const LogicalType& logical_type, + bool smallest_decimal_enabled) { const auto& decimal = checked_cast(logical_type); - if (decimal.precision() <= ::arrow::Decimal128Type::kMaxPrecision) { + + if (smallest_decimal_enabled) { + return ::arrow::smallest_decimal(decimal.precision(), decimal.scale()); + } else if (decimal.precision() <= ::arrow::Decimal128Type::kMaxPrecision) { return ::arrow::Decimal128Type::Make(decimal.precision(), decimal.scale()); + } else { + return ::arrow::Decimal256Type::Make(decimal.precision(), decimal.scale()); } - return ::arrow::Decimal256Type::Make(decimal.precision(), decimal.scale()); } Result> MakeArrowInt(const LogicalType& logical_type) { @@ -111,12 +116,14 @@ Result> MakeArrowTimestamp(const LogicalType& logical } Result> FromByteArray( - const LogicalType& logical_type, const ArrowReaderProperties& reader_properties) { + const LogicalType& logical_type, const ArrowReaderProperties& reader_properties + +) { switch (logical_type.type()) { case LogicalType::Type::STRING: return ::arrow::utf8(); case LogicalType::Type::DECIMAL: - return MakeArrowDecimal(logical_type); + return MakeArrowDecimal(logical_type, reader_properties.smallest_decimal_enabled()); case LogicalType::Type::NONE: case LogicalType::Type::ENUM: case LogicalType::Type::BSON: @@ -134,11 +141,12 @@ Result> FromByteArray( } } -Result> FromFLBA(const LogicalType& logical_type, - int32_t physical_length) { +Result> FromFLBA( + const LogicalType& logical_type, int32_t physical_length, + const ArrowReaderProperties& reader_properties) { switch (logical_type.type()) { case LogicalType::Type::DECIMAL: - return MakeArrowDecimal(logical_type); + return MakeArrowDecimal(logical_type, reader_properties.smallest_decimal_enabled()); case LogicalType::Type::FLOAT16: return ::arrow::float16(); case LogicalType::Type::NONE: @@ -152,7 +160,8 @@ Result> FromFLBA(const LogicalType& logical_type, } } -::arrow::Result> FromInt32(const LogicalType& logical_type) { +::arrow::Result> FromInt32( + const LogicalType& logical_type, const ArrowReaderProperties& reader_properties) { switch (logical_type.type()) { case LogicalType::Type::INT: return MakeArrowInt(logical_type); @@ -161,7 +170,7 @@ ::arrow::Result> FromInt32(const LogicalType& logical case LogicalType::Type::TIME: return MakeArrowTime32(logical_type); case LogicalType::Type::DECIMAL: - return MakeArrowDecimal(logical_type); + return MakeArrowDecimal(logical_type, reader_properties.smallest_decimal_enabled()); case LogicalType::Type::NONE: return ::arrow::int32(); default: @@ -170,12 +179,13 @@ ::arrow::Result> FromInt32(const LogicalType& logical } } -Result> FromInt64(const LogicalType& logical_type) { +Result> FromInt64( + const LogicalType& logical_type, const ArrowReaderProperties& reader_properties) { switch (logical_type.type()) { case LogicalType::Type::INT: return MakeArrowInt64(logical_type); case LogicalType::Type::DECIMAL: - return MakeArrowDecimal(logical_type); + return MakeArrowDecimal(logical_type, reader_properties.smallest_decimal_enabled()); case LogicalType::Type::TIMESTAMP: return MakeArrowTimestamp(logical_type); case LogicalType::Type::TIME: @@ -204,9 +214,9 @@ Result> GetArrowType( case ParquetType::BOOLEAN: return ::arrow::boolean(); case ParquetType::INT32: - return FromInt32(logical_type); + return FromInt32(logical_type, reader_properties); case ParquetType::INT64: - return FromInt64(logical_type); + return FromInt64(logical_type, reader_properties); case ParquetType::INT96: return ::arrow::timestamp(reader_properties.coerce_int96_timestamp_unit()); case ParquetType::FLOAT: @@ -216,7 +226,7 @@ Result> GetArrowType( case ParquetType::BYTE_ARRAY: return FromByteArray(logical_type, reader_properties); case ParquetType::FIXED_LEN_BYTE_ARRAY: - return FromFLBA(logical_type, type_length); + return FromFLBA(logical_type, type_length, reader_properties); default: { // PARQUET-1565: This can occur if the file is corrupt return Status::IOError("Invalid physical column type: ", diff --git a/cpp/src/parquet/arrow/schema_internal.h b/cpp/src/parquet/arrow/schema_internal.h index 58828f85ab8..ed16bf66daa 100644 --- a/cpp/src/parquet/arrow/schema_internal.h +++ b/cpp/src/parquet/arrow/schema_internal.h @@ -19,6 +19,7 @@ #include "arrow/result.h" #include "arrow/type_fwd.h" +#include "parquet/properties.h" #include "parquet/schema.h" namespace arrow { @@ -29,12 +30,17 @@ namespace parquet::arrow { using ::arrow::Result; -Result> FromByteArray(const LogicalType& logical_type, - bool use_known_arrow_extensions); -Result> FromFLBA(const LogicalType& logical_type, - int32_t physical_length); -Result> FromInt32(const LogicalType& logical_type); -Result> FromInt64(const LogicalType& logical_type); +Result> FromByteArray( + const LogicalType& logical_type, const ArrowReaderProperties& reader_properties); +Result> FromFLBA( + const LogicalType& logical_type, int32_t physical_length, + const ArrowReaderProperties& reader_properties); +Result> FromInt32( + const LogicalType& logical_type, + const ArrowReaderProperties& reader_properties = default_arrow_reader_properties()); +Result> FromInt64( + const LogicalType& logical_type, + const ArrowReaderProperties& reader_properties = default_arrow_reader_properties()); Result> GetArrowType( Type::type physical_type, const LogicalType& logical_type, int type_length, diff --git a/cpp/src/parquet/arrow/test_util.h b/cpp/src/parquet/arrow/test_util.h index cfc57ce6ea7..d4665558796 100644 --- a/cpp/src/parquet/arrow/test_util.h +++ b/cpp/src/parquet/arrow/test_util.h @@ -47,24 +47,58 @@ using ::arrow::Array; using ::arrow::ChunkedArray; using ::arrow::Status; -template -struct Decimal128WithPrecisionAndScale { - static_assert(PRECISION >= 1 && PRECISION <= 38, "Invalid precision value"); +struct BaseDecimalWithPrecisionAndScale {}; + +template +struct Decimal32WithPrecisionAndScale : BaseDecimalWithPrecisionAndScale { + static_assert(PRECISION >= ::arrow::Decimal32Type::kMinPrecision && + PRECISION <= ::arrow::Decimal32Type::kMaxPrecision, + "Invalid precision value"); + + using type = ::arrow::Decimal32Type; + static constexpr ::arrow::Type::type type_id = ::arrow::Decimal32Type::type_id; + static constexpr int32_t precision = PRECISION; + static constexpr int32_t scale = PRECISION - 1; + static constexpr bool smallest_decimal_enabled = SMALLEST_DECIMAL_ENABLED; +}; + +template +struct Decimal64WithPrecisionAndScale : BaseDecimalWithPrecisionAndScale { + static_assert(PRECISION >= ::arrow::Decimal64Type::kMinPrecision && + PRECISION <= ::arrow::Decimal64Type::kMaxPrecision, + "Invalid precision value"); + + using type = ::arrow::Decimal64Type; + static constexpr ::arrow::Type::type type_id = ::arrow::Decimal64Type::type_id; + static constexpr int32_t precision = PRECISION; + static constexpr int32_t scale = PRECISION - 1; + static constexpr bool smallest_decimal_enabled = SMALLEST_DECIMAL_ENABLED; +}; + +template +struct Decimal128WithPrecisionAndScale : BaseDecimalWithPrecisionAndScale { + static_assert(PRECISION >= ::arrow::Decimal128Type::kMinPrecision && + PRECISION <= ::arrow::Decimal128Type::kMaxPrecision, + "Invalid precision value"); using type = ::arrow::Decimal128Type; static constexpr ::arrow::Type::type type_id = ::arrow::Decimal128Type::type_id; static constexpr int32_t precision = PRECISION; static constexpr int32_t scale = PRECISION - 1; + static constexpr bool smallest_decimal_enabled = SMALLEST_DECIMAL_ENABLED; }; -template -struct Decimal256WithPrecisionAndScale { - static_assert(PRECISION >= 1 && PRECISION <= 76, "Invalid precision value"); +template +struct Decimal256WithPrecisionAndScale : BaseDecimalWithPrecisionAndScale { + static_assert(PRECISION >= ::arrow::Decimal256Type::kMinPrecision && + PRECISION <= ::arrow::Decimal256Type::kMaxPrecision, + "Invalid precision value"); using type = ::arrow::Decimal256Type; static constexpr ::arrow::Type::type type_id = ::arrow::Decimal256Type::type_id; static constexpr int32_t precision = PRECISION; static constexpr int32_t scale = PRECISION - 1; + static constexpr bool smallest_decimal_enabled = SMALLEST_DECIMAL_ENABLED; }; template @@ -156,7 +190,57 @@ static void random_decimals(int64_t n, uint32_t seed, int32_t precision, uint8_t template ::arrow::enable_if_t< - std::is_same>::value, Status> + std::is_same>::value || + std::is_same>::value, + Status> +NonNullArray(size_t size, std::shared_ptr* out) { + constexpr int32_t kDecimalPrecision = precision; + constexpr int32_t kDecimalScale = Decimal32WithPrecisionAndScale::scale; + + const auto type = ::arrow::decimal32(kDecimalPrecision, kDecimalScale); + ::arrow::Decimal32Builder builder(type); + const int32_t byte_width = + static_cast(*type).byte_width(); + + constexpr int32_t seed = 0; + + ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); + random_decimals<::arrow::Decimal32Type::kByteWidth>(size, seed, kDecimalPrecision, + out_buf->mutable_data()); + + RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size)); + return builder.Finish(out); +} + +template +::arrow::enable_if_t< + std::is_same>::value || + std::is_same>::value, + Status> +NonNullArray(size_t size, std::shared_ptr* out) { + constexpr int32_t kDecimalPrecision = precision; + constexpr int32_t kDecimalScale = Decimal64WithPrecisionAndScale::scale; + + const auto type = ::arrow::decimal64(kDecimalPrecision, kDecimalScale); + ::arrow::Decimal64Builder builder(type); + const int32_t byte_width = + static_cast(*type).byte_width(); + + constexpr int32_t seed = 0; + + ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); + random_decimals<::arrow::Decimal64Type::kByteWidth>(size, seed, kDecimalPrecision, + out_buf->mutable_data()); + + RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size)); + return builder.Finish(out); +} + +template +::arrow::enable_if_t< + std::is_same>::value || + std::is_same>::value, + Status> NonNullArray(size_t size, std::shared_ptr* out) { constexpr int32_t kDecimalPrecision = precision; constexpr int32_t kDecimalScale = Decimal128WithPrecisionAndScale::scale; @@ -178,7 +262,9 @@ NonNullArray(size_t size, std::shared_ptr* out) { template ::arrow::enable_if_t< - std::is_same>::value, Status> + std::is_same>::value || + std::is_same>::value, + Status> NonNullArray(size_t size, std::shared_ptr* out) { constexpr int32_t kDecimalPrecision = precision; constexpr int32_t kDecimalScale = Decimal256WithPrecisionAndScale::scale; @@ -345,7 +431,67 @@ ::arrow::enable_if_fixed_size_binary NullableArray( template ::arrow::enable_if_t< - std::is_same>::value, Status> + std::is_same>::value || + std::is_same>::value, + Status> +NullableArray(size_t size, size_t num_nulls, uint32_t seed, + std::shared_ptr<::arrow::Array>* out) { + std::vector valid_bytes(size, '\1'); + + for (size_t i = 0; i < num_nulls; ++i) { + valid_bytes[i * 2] = '\0'; + } + + constexpr int32_t kDecimalPrecision = precision; + constexpr int32_t kDecimalScale = Decimal32WithPrecisionAndScale::scale; + const auto type = ::arrow::decimal32(kDecimalPrecision, kDecimalScale); + const int32_t byte_width = + static_cast(*type).byte_width(); + + ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); + + random_decimals<::arrow::Decimal32Type::kByteWidth>(size, seed, precision, + out_buf->mutable_data()); + + ::arrow::Decimal32Builder builder(type); + RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data())); + return builder.Finish(out); +} + +template +::arrow::enable_if_t< + std::is_same>::value || + std::is_same>::value, + Status> +NullableArray(size_t size, size_t num_nulls, uint32_t seed, + std::shared_ptr<::arrow::Array>* out) { + std::vector valid_bytes(size, '\1'); + + for (size_t i = 0; i < num_nulls; ++i) { + valid_bytes[i * 2] = '\0'; + } + + constexpr int32_t kDecimalPrecision = precision; + constexpr int32_t kDecimalScale = Decimal64WithPrecisionAndScale::scale; + const auto type = ::arrow::decimal64(kDecimalPrecision, kDecimalScale); + const int32_t byte_width = + static_cast(*type).byte_width(); + + ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); + + random_decimals<::arrow::Decimal64Type::kByteWidth>(size, seed, precision, + out_buf->mutable_data()); + + ::arrow::Decimal64Builder builder(type); + RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data())); + return builder.Finish(out); +} + +template +::arrow::enable_if_t< + std::is_same>::value || + std::is_same>::value, + Status> NullableArray(size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<::arrow::Array>* out) { std::vector valid_bytes(size, '\1'); @@ -372,7 +518,9 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, template ::arrow::enable_if_t< - std::is_same>::value, Status> + std::is_same>::value || + std::is_same>::value, + Status> NullableArray(size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<::arrow::Array>* out) { std::vector valid_bytes(size, '\1'); diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index ce44aef3232..2e655bec1ce 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -2008,10 +2008,20 @@ struct SerializeFunctor< template value_type TransferValue(const uint8_t* in) const { - static_assert(byte_width == 16 || byte_width == 32, - "only 16 and 32 byte Decimals supported"); + static_assert(byte_width == ::arrow::Decimal32Type::kByteWidth || + byte_width == ::arrow::Decimal64Type::kByteWidth || + byte_width == ::arrow::Decimal128Type::kByteWidth || + byte_width == ::arrow::Decimal256Type::kByteWidth, + "only 4/8/16/32 byte Decimals supported"); + value_type value = 0; - if constexpr (byte_width == 16) { + if constexpr (byte_width == ::arrow::Decimal32Type::kByteWidth) { + ::arrow::Decimal32 decimal_value(in); + PARQUET_THROW_NOT_OK(decimal_value.ToInteger(&value)); + } else if constexpr (byte_width == ::arrow::Decimal64Type::kByteWidth) { + ::arrow::Decimal64 decimal_value(in); + PARQUET_THROW_NOT_OK(decimal_value.ToInteger(&value)); + } else if constexpr (byte_width == ::arrow::Decimal128Type::kByteWidth) { ::arrow::Decimal128 decimal_value(in); PARQUET_THROW_NOT_OK(decimal_value.ToInteger(&value)); } else { @@ -2057,6 +2067,8 @@ Status TypedColumnWriterImpl::WriteArrowDense( WRITE_ZERO_COPY_CASE(DATE32) WRITE_SERIALIZE_CASE(DATE64) WRITE_SERIALIZE_CASE(TIME32) + WRITE_SERIALIZE_CASE(DECIMAL32) + WRITE_SERIALIZE_CASE(DECIMAL64) WRITE_SERIALIZE_CASE(DECIMAL128) WRITE_SERIALIZE_CASE(DECIMAL256) default: @@ -2228,6 +2240,7 @@ Status TypedColumnWriterImpl::WriteArrowDense( WRITE_SERIALIZE_CASE(UINT64) WRITE_ZERO_COPY_CASE(TIME64) WRITE_ZERO_COPY_CASE(DURATION) + WRITE_SERIALIZE_CASE(DECIMAL64) WRITE_SERIALIZE_CASE(DECIMAL128) WRITE_SERIALIZE_CASE(DECIMAL256) default: @@ -2377,7 +2390,7 @@ struct SerializeFunctor< } // Parquet's Decimal are stored with FixedLength values where the length is - // proportional to the precision. Arrow's Decimal are always stored with 16/32 + // proportional to the precision. Arrow's Decimal are always stored with 4/8/16/32 // bytes. Thus the internal FLBA pointer must be adjusted by the offset calculated // here. int32_t Offset(const Array& array) { @@ -2391,29 +2404,45 @@ struct SerializeFunctor< int64_t non_null_count = array.length() - array.null_count(); int64_t size = non_null_count * ArrowType::kByteWidth; scratch_buffer = AllocateBuffer(ctx->memory_pool, size); - scratch = reinterpret_cast(scratch_buffer->mutable_data()); + scratch_i32 = reinterpret_cast(scratch_buffer->mutable_data()); + scratch_i64 = reinterpret_cast(scratch_buffer->mutable_data()); } template FixedLenByteArray FixDecimalEndianness(const uint8_t* in, int64_t offset) { + static_assert(byte_width == ::arrow::Decimal32Type::kByteWidth || + byte_width == ::arrow::Decimal64Type::kByteWidth || + byte_width == ::arrow::Decimal128Type::kByteWidth || + byte_width == ::arrow::Decimal256Type::kByteWidth, + "only 4/8/16/32 byte Decimals supported"); + + if constexpr (byte_width == ::arrow::Decimal32Type::kByteWidth) { + const auto* u32_in = reinterpret_cast(in); + auto out = reinterpret_cast(scratch_i32) + offset; + *scratch_i32++ = ::arrow::bit_util::ToBigEndian(u32_in[0]); + return FixedLenByteArray(out); + } + const auto* u64_in = reinterpret_cast(in); - auto out = reinterpret_cast(scratch) + offset; - static_assert(byte_width == 16 || byte_width == 32, - "only 16 and 32 byte Decimals supported"); - if (byte_width == 32) { - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[3]); - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[2]); - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[1]); - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); + auto out = reinterpret_cast(scratch_i64) + offset; + if constexpr (byte_width == ::arrow::Decimal64Type::kByteWidth) { + *scratch_i64++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); + } else if constexpr (byte_width == ::arrow::Decimal128Type::kByteWidth) { + *scratch_i64++ = ::arrow::bit_util::ToBigEndian(u64_in[1]); + *scratch_i64++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); } else { - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[1]); - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); + *scratch_i64++ = ::arrow::bit_util::ToBigEndian(u64_in[3]); + *scratch_i64++ = ::arrow::bit_util::ToBigEndian(u64_in[2]); + *scratch_i64++ = ::arrow::bit_util::ToBigEndian(u64_in[1]); + *scratch_i64++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); } + return FixedLenByteArray(out); } std::shared_ptr scratch_buffer; - int64_t* scratch; + int32_t* scratch_i32; + int64_t* scratch_i64; }; // ---------------------------------------------------------------------- @@ -2449,6 +2478,8 @@ Status TypedColumnWriterImpl::WriteArrowDense( const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { switch (array.type()->id()) { WRITE_SERIALIZE_CASE(FIXED_SIZE_BINARY) + WRITE_SERIALIZE_CASE(DECIMAL32) + WRITE_SERIALIZE_CASE(DECIMAL64) WRITE_SERIALIZE_CASE(DECIMAL128) WRITE_SERIALIZE_CASE(DECIMAL256) WRITE_SERIALIZE_CASE(HALF_FLOAT) diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 19436b84a37..c0c56f7ce47 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -914,7 +914,8 @@ class PARQUET_EXPORT ArrowReaderProperties { cache_options_(::arrow::io::CacheOptions::LazyDefaults()), coerce_int96_timestamp_unit_(::arrow::TimeUnit::NANO), arrow_extensions_enabled_(false), - should_load_statistics_(false) {} + should_load_statistics_(false), + smallest_decimal_enabled_(false) {} /// \brief Set whether to use the IO thread pool to parse columns in parallel. /// @@ -1006,6 +1007,15 @@ class PARQUET_EXPORT ArrowReaderProperties { /// Return whether loading statistics as much as possible. bool should_load_statistics() const { return should_load_statistics_; } + /// \brief When enabled, Parquet decimal logical types will always be + /// mapped to the smallest arrow decimal types based on the precision + /// + /// Default is false + void set_smallest_decimal_enabled(bool smallest_decimal_enabled) { + smallest_decimal_enabled_ = smallest_decimal_enabled; + } + bool smallest_decimal_enabled() const { return smallest_decimal_enabled_; } + private: bool use_threads_; std::unordered_set read_dict_indices_; @@ -1016,6 +1026,7 @@ class PARQUET_EXPORT ArrowReaderProperties { ::arrow::TimeUnit::type coerce_int96_timestamp_unit_; bool arrow_extensions_enabled_; bool should_load_statistics_; + bool smallest_decimal_enabled_; }; /// EXPERIMENTAL: Constructs the default ArrowReaderProperties diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index 45db755d903..b144a472d81 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -703,7 +703,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): cache_options : pyarrow.CacheOptions, default None Cache options used when pre_buffer is enabled. The default values should be good for most use cases. You may want to adjust these for example if - you have exceptionally high latency to the file system. + you have exceptionally high latency to the file system. thrift_string_size_limit : int, default None If not None, override the maximum total string size allocated when decoding Thrift structures. The default limit should be @@ -720,6 +720,9 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): Parquet file. page_checksum_verification : bool, default False If True, verify the page checksum for each page read from the file. + smallest_decimal_enabled : bool, default False + If True, always convert to the smallest arrow decimal type based + on precision. """ # Avoid mistakingly creating attributes @@ -733,7 +736,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): thrift_container_size_limit=None, decryption_config=None, decryption_properties=None, - bint page_checksum_verification=False): + bint page_checksum_verification=False, + bint smallest_decimal_enabled=False): self.init(shared_ptr[CFragmentScanOptions]( new CParquetFragmentScanOptions())) self.use_buffered_stream = use_buffered_stream @@ -752,6 +756,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): if decryption_properties is not None: self.decryption_properties = decryption_properties self.page_checksum_verification = page_checksum_verification + self.smallest_decimal_enabled = smallest_decimal_enabled cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): FragmentScanOptions.init(self, sp) @@ -868,6 +873,14 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): def page_checksum_verification(self, bint page_checksum_verification): self.reader_properties().set_page_checksum_verification(page_checksum_verification) + @property + def smallest_decimal_enabled(self): + return self.arrow_reader_properties().smallest_decimal_enabled() + + @smallest_decimal_enabled.setter + def smallest_decimal_enabled(self, bint smallest_decimal_enabled): + self.arrow_reader_properties().set_smallest_decimal_enabled(smallest_decimal_enabled) + def equals(self, ParquetFragmentScanOptions other): """ Parameters diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index e6de9712f83..dbe30198bbd 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -404,6 +404,8 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: CCacheOptions cache_options() const void set_coerce_int96_timestamp_unit(TimeUnit unit) TimeUnit coerce_int96_timestamp_unit() const + void set_smallest_decimal_enabled(c_bool smallest_decimal_enabled) + c_bool smallest_decimal_enabled() const ArrowReaderProperties default_arrow_reader_properties() diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 55c2866243b..5e4bed6406d 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -1452,7 +1452,8 @@ cdef class ParquetReader(_Weakrefable): FileDecryptionProperties decryption_properties=None, thrift_string_size_limit=None, thrift_container_size_limit=None, - page_checksum_verification=False): + page_checksum_verification=False, + smallest_decimal_enabled=False): """ Open a parquet file for reading. @@ -1469,6 +1470,7 @@ cdef class ParquetReader(_Weakrefable): thrift_string_size_limit : int, optional thrift_container_size_limit : int, optional page_checksum_verification : bool, default False + smallest_decimal_enabled : bool, default False """ cdef: shared_ptr[CFileMetaData] c_metadata @@ -1508,6 +1510,7 @@ cdef class ParquetReader(_Weakrefable): decryption_properties.unwrap()) arrow_props.set_pre_buffer(pre_buffer) + arrow_props.set_smallest_decimal_enabled(smallest_decimal_enabled) properties.set_page_checksum_verification(page_checksum_verification) diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index 8d3dec96a6b..2d01d73439c 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -254,6 +254,9 @@ class ParquetFile: it will be parsed as an URI to determine the filesystem. page_checksum_verification : bool, default False If True, verify the checksum for each page read from the file. + smallest_decimal_enabled : bool, default False + If True, always convert to the smallest arrow decimal type based + on precision. Examples -------- @@ -302,7 +305,7 @@ def __init__(self, source, *, metadata=None, common_metadata=None, pre_buffer=False, coerce_int96_timestamp_unit=None, decryption_properties=None, thrift_string_size_limit=None, thrift_container_size_limit=None, filesystem=None, - page_checksum_verification=False): + page_checksum_verification=False, smallest_decimal_enabled=False): self._close_source = getattr(source, 'closed', True) @@ -322,6 +325,7 @@ def __init__(self, source, *, metadata=None, common_metadata=None, thrift_string_size_limit=thrift_string_size_limit, thrift_container_size_limit=thrift_container_size_limit, page_checksum_verification=page_checksum_verification, + smallest_decimal_enabled=smallest_decimal_enabled, ) self.common_metadata = common_metadata self._nested_paths_by_prefix = self._build_nested_paths() @@ -1264,6 +1268,9 @@ class ParquetDataset: sufficient for most Parquet files. page_checksum_verification : bool, default False If True, verify the page checksum for each page read from the file. +smallest_decimal_enabled : bool, default False + If True, always convert to the smallest arrow decimal type based + on precision. Examples -------- @@ -1276,7 +1283,8 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, *, filters=None, coerce_int96_timestamp_unit=None, decryption_properties=None, thrift_string_size_limit=None, thrift_container_size_limit=None, - page_checksum_verification=False): + page_checksum_verification=False, + smallest_decimal_enabled=False): import pyarrow.dataset as ds @@ -1287,6 +1295,7 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, *, filters=None, "thrift_string_size_limit": thrift_string_size_limit, "thrift_container_size_limit": thrift_container_size_limit, "page_checksum_verification": page_checksum_verification, + "smallest_decimal_enabled": smallest_decimal_enabled, } if buffer_size: read_options.update(use_buffered_stream=True, @@ -1674,6 +1683,9 @@ def partitioning(self): sufficient for most Parquet files. page_checksum_verification : bool, default False If True, verify the checksum for each page read from the file. +smallest_decimal_enabled : bool, default False + If True, always convert to the smallest arrow decimal type based + on precision. Returns ------- @@ -1768,7 +1780,8 @@ def read_table(source, *, columns=None, use_threads=True, pre_buffer=True, coerce_int96_timestamp_unit=None, decryption_properties=None, thrift_string_size_limit=None, thrift_container_size_limit=None, - page_checksum_verification=False): + page_checksum_verification=False, + smallest_decimal_enabled=False): try: dataset = ParquetDataset( @@ -1787,6 +1800,7 @@ def read_table(source, *, columns=None, use_threads=True, thrift_string_size_limit=thrift_string_size_limit, thrift_container_size_limit=thrift_container_size_limit, page_checksum_verification=page_checksum_verification, + smallest_decimal_enabled=smallest_decimal_enabled, ) except ImportError: # fall back on ParquetFile for simple cases when pyarrow.dataset @@ -1819,6 +1833,7 @@ def read_table(source, *, columns=None, use_threads=True, thrift_string_size_limit=thrift_string_size_limit, thrift_container_size_limit=thrift_container_size_limit, page_checksum_verification=page_checksum_verification, + smallest_decimal_enabled=smallest_decimal_enabled, ) return dataset.read(columns=columns, use_threads=use_threads, diff --git a/python/pyarrow/src/arrow/python/arrow_to_pandas.cc b/python/pyarrow/src/arrow/python/arrow_to_pandas.cc index f694eb82baa..ee06da398af 100644 --- a/python/pyarrow/src/arrow/python/arrow_to_pandas.cc +++ b/python/pyarrow/src/arrow/python/arrow_to_pandas.cc @@ -182,6 +182,8 @@ static inline bool ListTypeSupported(const DataType& type) { case Type::HALF_FLOAT: case Type::FLOAT: case Type::DOUBLE: + case Type::DECIMAL32: + case Type::DECIMAL64: case Type::DECIMAL128: case Type::DECIMAL256: case Type::BINARY: diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py index 43fddd413a0..92f962cf897 100644 --- a/python/pyarrow/tests/parquet/test_basic.py +++ b/python/pyarrow/tests/parquet/test_basic.py @@ -362,6 +362,20 @@ def test_byte_stream_split(): use_dictionary=False) +def test_smallest_decimal_enabled(tempdir): + arr1 = pa.array(list(map(Decimal, range(100))), type=pa.decimal32(5, 2)) + arr2 = pa.array(list(map(Decimal, range(100))), type=pa.decimal64(16, 9)) + arr3 = pa.array(list(map(Decimal, range(100))), type=pa.decimal128(22, 2)) + arr4 = pa.array(list(map(Decimal, range(100))), type=pa.decimal256(48, 2)) + data_decimal = [arr1, arr2, arr3, arr4] + table = pa.Table.from_arrays(data_decimal, names=['a', 'b', 'c', 'd']) + + # Check with smallest_decimal_enabled + _check_roundtrip(table, + expected=table, + read_table_kwargs={"smallest_decimal_enabled": True}) + + def test_store_decimal_as_integer(tempdir): arr_decimal_1_9 = pa.array(list(map(Decimal, range(100))), type=pa.decimal128(5, 2))