diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index 4942336a126..a7afd065b59 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -613,6 +613,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions): write_page_index=self._properties["write_page_index"], write_page_checksum=self._properties["write_page_checksum"], sorting_columns=self._properties["sorting_columns"], + store_decimal_as_integer=self._properties["store_decimal_as_integer"], ) def _set_arrow_properties(self): @@ -664,6 +665,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions): encryption_config=None, write_page_checksum=False, sorting_columns=None, + store_decimal_as_integer=False, ) self._set_properties() diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 1bfa505c544..c2ee7d3aedf 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -431,6 +431,8 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: Builder* disable_statistics() Builder* enable_statistics() Builder* enable_statistics(const c_string& path) + Builder* enable_store_decimal_as_integer() + Builder* disable_store_decimal_as_integer() Builder* data_pagesize(int64_t size) Builder* encoding(ParquetEncoding encoding) Builder* encoding(const c_string& path, @@ -594,6 +596,7 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( write_page_index=*, write_page_checksum=*, sorting_columns=*, + store_decimal_as_integer=*, ) except * diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 414f0cef4e5..e942d7bb842 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -1830,7 +1830,9 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( dictionary_pagesize_limit=None, write_page_index=False, write_page_checksum=False, - sorting_columns=None) except *: + sorting_columns=None, + store_decimal_as_integer=False) except *: + """General writer properties""" cdef: shared_ptr[WriterProperties] properties @@ -1941,6 +1943,16 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( "'use_byte_stream_split' cannot be passed" "together with 'column_encoding'") + # store_decimal_as_integer + + if isinstance(store_decimal_as_integer, bool): + if store_decimal_as_integer: + props.enable_store_decimal_as_integer() + else: + props.disable_store_decimal_as_integer() + else: + raise TypeError("'store_decimal_as_integer' must be a boolean") + # column_encoding # encoding map - encode individual columns @@ -2114,6 +2126,7 @@ cdef class ParquetWriter(_Weakrefable): int64_t write_batch_size int64_t dictionary_pagesize_limit object store_schema + object store_decimal_as_integer def __cinit__(self, where, Schema schema not None, use_dictionary=None, compression=None, version=None, @@ -2135,7 +2148,8 @@ cdef class ParquetWriter(_Weakrefable): store_schema=True, write_page_index=False, write_page_checksum=False, - sorting_columns=None): + sorting_columns=None, + store_decimal_as_integer=False): cdef: shared_ptr[WriterProperties] properties shared_ptr[ArrowWriterProperties] arrow_properties @@ -2169,6 +2183,7 @@ cdef class ParquetWriter(_Weakrefable): write_page_index=write_page_index, write_page_checksum=write_page_checksum, sorting_columns=sorting_columns, + store_decimal_as_integer=store_decimal_as_integer, ) arrow_properties = _create_arrow_writer_properties( use_deprecated_int96_timestamps=use_deprecated_int96_timestamps, diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index eaff79c8b13..31b3ad57da6 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -873,6 +873,23 @@ def _sanitize_table(table, new_schema, flavor): Specify the sort order of the data being written. The writer does not sort the data nor does it verify that the data is sorted. The sort order is written to the row group metadata, which can then be used by readers. +store_decimal_as_integer : bool, default False + Allow decimals with 1 <= precision <= 18 to be stored as integers. + In Parquet, DECIMAL can be stored in any of the following physical types: + - int32: for 1 <= precision <= 9. + - int64: for 10 <= precision <= 18. + - fixed_len_byte_array: precision is limited by the array size. + Length n can store <= floor(log_10(2^(8*n - 1) - 1)) base-10 digits. + - binary: precision is unlimited. The minimum number of bytes to store the + unscaled value is used. + + By default, this is DISABLED and all decimal types annotate fixed_len_byte_array. + When enabled, the writer will use the following physical types to store decimals: + - int32: for 1 <= precision <= 9. + - int64: for 10 <= precision <= 18. + - fixed_len_byte_array: for precision > 18. + + As a consequence, decimal columns stored in integer types are more compact. """ _parquet_writer_example_doc = """\ @@ -968,6 +985,7 @@ def __init__(self, where, schema, filesystem=None, write_page_index=False, write_page_checksum=False, sorting_columns=None, + store_decimal_as_integer=False, **options): if use_deprecated_int96_timestamps is None: # Use int96 timestamps for Spark @@ -1020,6 +1038,7 @@ def __init__(self, where, schema, filesystem=None, write_page_index=write_page_index, write_page_checksum=write_page_checksum, sorting_columns=sorting_columns, + store_decimal_as_integer=store_decimal_as_integer, **options) self.is_open = True @@ -1873,6 +1892,7 @@ def write_table(table, where, row_group_size=None, version='2.6', write_page_index=False, write_page_checksum=False, sorting_columns=None, + store_decimal_as_integer=False, **kwargs): # Implementor's note: when adding keywords here / updating defaults, also # update it in write_to_dataset and _dataset_parquet.pyx ParquetFileWriteOptions @@ -1903,6 +1923,7 @@ def write_table(table, where, row_group_size=None, version='2.6', write_page_index=write_page_index, write_page_checksum=write_page_checksum, sorting_columns=sorting_columns, + store_decimal_as_integer=store_decimal_as_integer, **kwargs) as writer: writer.write_table(table, row_group_size=row_group_size) except Exception: diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py index 56b967a0595..194af7415e8 100644 --- a/python/pyarrow/tests/parquet/test_basic.py +++ b/python/pyarrow/tests/parquet/test_basic.py @@ -15,10 +15,12 @@ # specific language governing permissions and limitations # under the License. +import os from collections import OrderedDict import io import warnings from shutil import copytree +from decimal import Decimal import numpy as np import pytest @@ -357,6 +359,60 @@ def test_byte_stream_split(): use_dictionary=False) +def test_store_decimal_as_integer(tempdir): + arr_decimal_1_9 = pa.array(list(map(Decimal, range(100))), + type=pa.decimal128(5, 2)) + arr_decimal_10_18 = pa.array(list(map(Decimal, range(100))), + type=pa.decimal128(16, 9)) + arr_decimal_gt18 = pa.array(list(map(Decimal, range(100))), + type=pa.decimal128(22, 2)) + arr_bool = pa.array([True, False] * 50) + data_decimal = [arr_decimal_1_9, arr_decimal_10_18, arr_decimal_gt18] + table = pa.Table.from_arrays(data_decimal, names=['a', 'b', 'c']) + + # Check with store_decimal_as_integer. + _check_roundtrip(table, + expected=table, + compression="gzip", + use_dictionary=False, + store_decimal_as_integer=True) + + # Check physical type in parquet schema + pqtestfile_path = os.path.join(tempdir, 'test.parquet') + pq.write_table(table, pqtestfile_path, + compression="gzip", + use_dictionary=False, + store_decimal_as_integer=True) + + pqtestfile = pq.ParquetFile(pqtestfile_path) + pqcol_decimal_1_9 = pqtestfile.schema.column(0) + pqcol_decimal_10_18 = pqtestfile.schema.column(1) + + assert pqcol_decimal_1_9.physical_type == 'INT32' + assert pqcol_decimal_10_18.physical_type == 'INT64' + + # Check with store_decimal_as_integer and delta-int encoding. + # DELTA_BINARY_PACKED requires parquet physical type to be INT64 or INT32 + _check_roundtrip(table, + expected=table, + compression="gzip", + use_dictionary=False, + store_decimal_as_integer=True, + column_encoding={ + 'a': 'DELTA_BINARY_PACKED', + 'b': 'DELTA_BINARY_PACKED' + }) + + # Check with mixed column types. + mixed_table = pa.Table.from_arrays( + [arr_decimal_1_9, arr_decimal_10_18, arr_decimal_gt18, arr_bool], + names=['a', 'b', 'c', 'd']) + _check_roundtrip(mixed_table, + expected=mixed_table, + use_dictionary=False, + store_decimal_as_integer=True) + + def test_column_encoding(): arr_float = pa.array(list(map(float, range(100)))) arr_int = pa.array(list(map(int, range(100))))