From 5796455ab676a0d93ec730d50108cf243fa5a0f5 Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 30 Aug 2023 00:04:56 +0800 Subject: [PATCH] [Init] Python CRC skeleton --- python/pyarrow/_dataset_parquet.pyx | 19 ++++++++++++++++--- python/pyarrow/_parquet.pxd | 5 +++++ python/pyarrow/_parquet.pyx | 21 +++++++++++++++++---- python/pyarrow/parquet/core.py | 27 +++++++++++++++++++++++---- python/pyarrow/tests/test_dataset.py | 6 ++++++ 5 files changed, 67 insertions(+), 11 deletions(-) diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index 79bd270ce54..4ef709dc210 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -670,6 +670,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): If not None, override the maximum total size of containers allocated when decoding Thrift structures. The default limit should be sufficient for most Parquet files. + page_checksum_verification : bool, default False + If True, verify the page checksum for each page read from the file. """ cdef: @@ -682,7 +684,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): buffer_size=8192, bint pre_buffer=False, thrift_string_size_limit=None, - thrift_container_size_limit=None): + thrift_container_size_limit=None, + bint page_checksum_verification=False): self.init(shared_ptr[CFragmentScanOptions]( new CParquetFragmentScanOptions())) self.use_buffered_stream = use_buffered_stream @@ -692,6 +695,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): self.thrift_string_size_limit = thrift_string_size_limit if thrift_container_size_limit is not None: self.thrift_container_size_limit = thrift_container_size_limit + self.page_checksum_verification = page_checksum_verification cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): FragmentScanOptions.init(self, sp) @@ -752,6 +756,14 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): raise ValueError("size must be larger than zero") self.reader_properties().set_thrift_container_size_limit(size) + @property + def page_checksum_verification(self): + return self.reader_properties().page_checksum_verification() + + @page_checksum_verification.setter + def page_checksum_verification(self, bint page_checksum_verification): + return self.reader_properties().page_checksum_verification(page_checksum_verification) + def equals(self, ParquetFragmentScanOptions other): """ Parameters @@ -764,11 +776,11 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): """ attrs = ( self.use_buffered_stream, self.buffer_size, self.pre_buffer, - self.thrift_string_size_limit, self.thrift_container_size_limit) + self.thrift_string_size_limit, self.thrift_container_size_limit, self.page_checksum_verification) other_attrs = ( other.use_buffered_stream, other.buffer_size, other.pre_buffer, other.thrift_string_size_limit, - other.thrift_container_size_limit) + other.thrift_container_size_limit, other.page_checksum_verification) return attrs == other_attrs @staticmethod @@ -785,6 +797,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): pre_buffer=self.pre_buffer, thrift_string_size_limit=self.thrift_string_size_limit, thrift_container_size_limit=self.thrift_container_size_limit, + page_checksum_verification=self.page_checksum_verification ) return ParquetFragmentScanOptions._reconstruct, (kwargs,) diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 39cdcc063b5..df4bffe9fd7 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -380,6 +380,9 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: shared_ptr[CFileDecryptionProperties] file_decryption_properties() \ const + c_bool page_checksum_verification() const + void set_page_checksum_verification(c_bool check_crc) + CReaderProperties default_reader_properties() cdef cppclass ArrowReaderProperties: @@ -428,6 +431,8 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: Builder* dictionary_pagesize_limit(int64_t dictionary_pagesize_limit) Builder* enable_write_page_index() Builder* disable_write_page_index() + Builder* enable_page_checksum() + Builder* disable_page_checksum() shared_ptr[WriterProperties] build() cdef cppclass ArrowWriterProperties: diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 50b4ed8e86e..36822d7a30c 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -1182,7 +1182,8 @@ cdef class ParquetReader(_Weakrefable): coerce_int96_timestamp_unit=None, FileDecryptionProperties decryption_properties=None, thrift_string_size_limit=None, - thrift_container_size_limit=None): + thrift_container_size_limit=None, + page_checksum_verification=False): """ Open a parquet file for reading. @@ -1198,6 +1199,7 @@ cdef class ParquetReader(_Weakrefable): decryption_properties : FileDecryptionProperties, optional thrift_string_size_limit : int, optional thrift_container_size_limit : int, optional + page_checksum_verification : bool, default False """ cdef: shared_ptr[CFileMetaData] c_metadata @@ -1235,6 +1237,8 @@ cdef class ParquetReader(_Weakrefable): arrow_props.set_pre_buffer(pre_buffer) + properties.set_page_checksum_verification(page_checksum_verification) + if coerce_int96_timestamp_unit is None: # use the default defined in default_arrow_reader_properties() pass @@ -1559,7 +1563,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( FileEncryptionProperties encryption_properties=None, write_batch_size=None, dictionary_pagesize_limit=None, - write_page_index=False) except *: + write_page_index=False, + page_checksum_enabled=False) except *: """General writer properties""" cdef: shared_ptr[WriterProperties] properties @@ -1703,6 +1708,12 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( # a size larger than this then it will be latched to this value. props.max_row_group_length(_MAX_ROW_GROUP_SIZE) + # checksum + if page_checksum_enabled: + props.enable_page_checksum() + else: + props.disable_page_checksum() + # page index if write_page_index: @@ -1822,7 +1833,8 @@ cdef class ParquetWriter(_Weakrefable): write_batch_size=None, dictionary_pagesize_limit=None, store_schema=True, - write_page_index=False): + write_page_index=False, + page_checksum_enabled=False): cdef: shared_ptr[WriterProperties] properties shared_ptr[ArrowWriterProperties] arrow_properties @@ -1853,7 +1865,8 @@ cdef class ParquetWriter(_Weakrefable): encryption_properties=encryption_properties, write_batch_size=write_batch_size, dictionary_pagesize_limit=dictionary_pagesize_limit, - write_page_index=write_page_index + write_page_index=write_page_index, + page_checksum_enabled=page_checksum_enabled ) arrow_properties = _create_arrow_writer_properties( use_deprecated_int96_timestamps=use_deprecated_int96_timestamps, diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index e0cdfee62ef..07f3ae7d1b8 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -280,6 +280,8 @@ class ParquetFile: If nothing passed, will be inferred based on path. Path will try to be found in the local on-disk filesystem otherwise it will be parsed as an URI to determine the filesystem. + page_checksum_verification : bool, default False + If True, verify the checksum for each page read from the file. Examples -------- @@ -327,7 +329,7 @@ def __init__(self, source, *, metadata=None, common_metadata=None, read_dictionary=None, memory_map=False, buffer_size=0, pre_buffer=False, coerce_int96_timestamp_unit=None, decryption_properties=None, thrift_string_size_limit=None, - thrift_container_size_limit=None, filesystem=None): + thrift_container_size_limit=None, filesystem=None,page_checksum_verification=False): self._close_source = getattr(source, 'closed', True) @@ -346,6 +348,7 @@ def __init__(self, source, *, metadata=None, common_metadata=None, decryption_properties=decryption_properties, thrift_string_size_limit=thrift_string_size_limit, thrift_container_size_limit=thrift_container_size_limit, + page_checksum_verification=page_checksum_verification, ) self.common_metadata = common_metadata self._nested_paths_by_prefix = self._build_nested_paths() @@ -881,6 +884,10 @@ def _sanitize_table(table, new_schema, flavor): filtering more efficient than the page header, as it gathers all the statistics for a Parquet file in a single place, avoiding scattered I/O. Note that the page index is not yet used on the read size by PyArrow. +page_checksum_enabled: bool, default False + Whether to write page checksums in general for all columns. + File might be corrupted during the transmission or when it is stored in the + storage. Page checksums are used to detect the corruption. """ _parquet_writer_example_doc = """\ @@ -974,6 +981,7 @@ def __init__(self, where, schema, filesystem=None, dictionary_pagesize_limit=None, store_schema=True, write_page_index=False, + page_checksum_enabled=False, **options): if use_deprecated_int96_timestamps is None: # Use int96 timestamps for Spark @@ -1031,6 +1039,7 @@ def __init__(self, where, schema, filesystem=None, dictionary_pagesize_limit=dictionary_pagesize_limit, store_schema=store_schema, write_page_index=write_page_index, + page_checksum_enabled=page_checksum_enabled, **options) self.is_open = True @@ -1759,6 +1768,8 @@ class ParquetDataset: If not None, override the maximum total size of containers allocated when decoding Thrift structures. The default limit should be sufficient for most Parquet files. +page_checksum_verification : bool, default False + If True, verify the page checksum for each page read from the file. Examples -------- @@ -1772,7 +1783,8 @@ def __new__(cls, path_or_paths=None, filesystem=None, schema=None, use_legacy_dataset=None, pre_buffer=True, coerce_int96_timestamp_unit=None, thrift_string_size_limit=None, - thrift_container_size_limit=None): + thrift_container_size_limit=None, + page_checksum_verification=False): extra_msg = "" if use_legacy_dataset is None: @@ -1805,6 +1817,7 @@ def __new__(cls, path_or_paths=None, filesystem=None, schema=None, metadata_nthreads=metadata_nthreads, thrift_string_size_limit=thrift_string_size_limit, thrift_container_size_limit=thrift_container_size_limit, + page_checksum_verification=page_checksum_verification, ) warnings.warn( "Passing 'use_legacy_dataset=True' to get the legacy behaviour is " @@ -1821,7 +1834,8 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, use_legacy_dataset=None, pre_buffer=True, coerce_int96_timestamp_unit=None, thrift_string_size_limit=None, - thrift_container_size_limit=None): + thrift_container_size_limit=None, + page_checksum_verification=False): if partitioning != "hive": raise ValueError( 'Only "hive" for hive-like partitioning is supported when ' @@ -2412,6 +2426,7 @@ def __init__(self, path_or_paths, filesystem=None, *, filters=None, coerce_int96_timestamp_unit=None, schema=None, decryption_properties=None, thrift_string_size_limit=None, thrift_container_size_limit=None, + page_checksum_verification=False, **kwargs): import pyarrow.dataset as ds @@ -2430,6 +2445,7 @@ def __init__(self, path_or_paths, filesystem=None, *, filters=None, "coerce_int96_timestamp_unit": coerce_int96_timestamp_unit, "thrift_string_size_limit": thrift_string_size_limit, "thrift_container_size_limit": thrift_container_size_limit, + "page_checksum_verification": page_checksum_verification, } if buffer_size: read_options.update(use_buffered_stream=True, @@ -2942,7 +2958,8 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None, ignore_prefixes=None, pre_buffer=True, coerce_int96_timestamp_unit=None, decryption_properties=None, thrift_string_size_limit=None, - thrift_container_size_limit=None): + thrift_container_size_limit=None, + page_checksum_verification=False): if not use_legacy_dataset: if metadata is not None: raise ValueError( @@ -2966,6 +2983,7 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit, thrift_string_size_limit=thrift_string_size_limit, thrift_container_size_limit=thrift_container_size_limit, + page_checksum_verification=page_checksum_verification, ) except ImportError: # fall back on ParquetFile for simple cases when pyarrow.dataset @@ -2997,6 +3015,7 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None, decryption_properties=decryption_properties, thrift_string_size_limit=thrift_string_size_limit, thrift_container_size_limit=thrift_container_size_limit, + page_checksum_verification=page_checksum_verification, ) return dataset.read(columns=columns, use_threads=use_threads, diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index b8a0c380899..d9546a96efe 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -788,12 +788,15 @@ def test_parquet_scan_options(): opts5 = ds.ParquetFragmentScanOptions( thrift_string_size_limit=123456, thrift_container_size_limit=987654,) + opt6 = ds.ParquetFragmentScanOptions( + page_checksum_verification=True) assert opts1.use_buffered_stream is False assert opts1.buffer_size == 2**13 assert opts1.pre_buffer is False assert opts1.thrift_string_size_limit == 100_000_000 # default in C++ assert opts1.thrift_container_size_limit == 1_000_000 # default in C++ + assert opts1.page_checksum_verification is False assert opts2.use_buffered_stream is False assert opts2.buffer_size == 2**12 @@ -810,11 +813,14 @@ def test_parquet_scan_options(): assert opts5.thrift_string_size_limit == 123456 assert opts5.thrift_container_size_limit == 987654 + assert opts6.page_checksum_verification is True + assert opts1 == opts1 assert opts1 != opts2 assert opts2 != opts3 assert opts3 != opts4 assert opts5 != opts1 + assert opts6 != opts1 def test_file_format_pickling(pickle_module):