diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index 1d972b78fb9..839f9e0e4c3 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -756,6 +757,8 @@ std::unique_ptr ParquetFileReader::Contents::Open( std::unique_ptr result( new SerializedFile(std::move(source), props)); + std::cout << "Open with crc: " << std::boolalpha << props.page_checksum_verification() + << std::endl; // Access private methods here, but otherwise unavailable SerializedFile* file = static_cast(result.get()); diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index 31aa058706a..b605e246b4c 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -96,6 +96,9 @@ cdef class ParquetFileFormat(FileFormat): if option in _PARQUET_READ_OPTIONS} scan_args = {option: kwargs[option] for option in kwargs if option not in _PARQUET_READ_OPTIONS} + print("dataset: read_options_args:", read_options_args, "scan_args", scan_args) + print("read_options:", read_options) + print("default_fragment_scan_options:", default_fragment_scan_options) if read_options and read_options_args: duplicates = ', '.join(sorted(read_options_args)) raise ValueError(f'If `read_options` is given, ' @@ -126,9 +129,11 @@ cdef class ParquetFileFormat(FileFormat): 'instance of ParquetReadOptions') if default_fragment_scan_options is None: + print("build scanOptions with ", scan_args) default_fragment_scan_options = ParquetFragmentScanOptions( **scan_args) elif isinstance(default_fragment_scan_options, dict): + print("build scanOptions with ", default_fragment_scan_options) default_fragment_scan_options = ParquetFragmentScanOptions( **default_fragment_scan_options) elif not isinstance(default_fragment_scan_options, @@ -701,6 +706,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): decryption_config : pyarrow.dataset.ParquetDecryptionConfig, default None If not None, use the provided ParquetDecryptionConfig to decrypt the Parquet file. + page_checksum_verification : bool, default False + If True, verify the page checksum for each page read from the file. """ # Avoid mistakingly creating attributes @@ -711,7 +718,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): bint pre_buffer=True, thrift_string_size_limit=None, thrift_container_size_limit=None, - decryption_config=None): + decryption_config=None, + bint page_checksum_verification=False): self.init(shared_ptr[CFragmentScanOptions]( new CParquetFragmentScanOptions())) self.use_buffered_stream = use_buffered_stream @@ -723,6 +731,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): self.thrift_container_size_limit = thrift_container_size_limit if decryption_config is not None: self.parquet_decryption_config = decryption_config + self.page_checksum_verification = page_checksum_verification cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): FragmentScanOptions.init(self, sp) @@ -802,6 +811,14 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): set_decryption_config(self, config) self._parquet_decryption_config = config + @property + def page_checksum_verification(self): + return self.reader_properties().page_checksum_verification() + + @page_checksum_verification.setter + def page_checksum_verification(self, bint page_checksum_verification): + self.reader_properties().set_page_checksum_verification(page_checksum_verification) + def equals(self, ParquetFragmentScanOptions other): """ Parameters @@ -814,11 +831,12 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): """ attrs = ( self.use_buffered_stream, self.buffer_size, self.pre_buffer, - self.thrift_string_size_limit, self.thrift_container_size_limit) + self.thrift_string_size_limit, self.thrift_container_size_limit, + self.page_checksum_verification) other_attrs = ( other.use_buffered_stream, other.buffer_size, other.pre_buffer, other.thrift_string_size_limit, - other.thrift_container_size_limit) + other.thrift_container_size_limit, other.page_checksum_verification) return attrs == other_attrs @staticmethod @@ -835,6 +853,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): pre_buffer=self.pre_buffer, thrift_string_size_limit=self.thrift_string_size_limit, thrift_container_size_limit=self.thrift_container_size_limit, + page_checksum_verification=self.page_checksum_verification ) return ParquetFragmentScanOptions._reconstruct, (kwargs,) diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 39cdcc063b5..7dd73c7690c 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -380,6 +380,9 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: shared_ptr[CFileDecryptionProperties] file_decryption_properties() \ const + c_bool page_checksum_verification() const + void set_page_checksum_verification(c_bool check_crc) + CReaderProperties default_reader_properties() cdef cppclass ArrowReaderProperties: @@ -428,6 +431,8 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: Builder* dictionary_pagesize_limit(int64_t dictionary_pagesize_limit) Builder* enable_write_page_index() Builder* disable_write_page_index() + Builder* enable_page_checksum() + Builder* disable_page_checksum() shared_ptr[WriterProperties] build() cdef cppclass ArrowWriterProperties: @@ -576,7 +581,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( FileEncryptionProperties encryption_properties=*, write_batch_size=*, dictionary_pagesize_limit=*, - write_page_index=*) except * + write_page_index=*, + page_checksum_enabled=*) except * cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties( diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 48091367b2f..40b161e2f5c 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -1182,7 +1182,8 @@ cdef class ParquetReader(_Weakrefable): coerce_int96_timestamp_unit=None, FileDecryptionProperties decryption_properties=None, thrift_string_size_limit=None, - thrift_container_size_limit=None): + thrift_container_size_limit=None, + page_checksum_verification=False): """ Open a parquet file for reading. @@ -1198,6 +1199,7 @@ cdef class ParquetReader(_Weakrefable): decryption_properties : FileDecryptionProperties, optional thrift_string_size_limit : int, optional thrift_container_size_limit : int, optional + page_checksum_verification : bool, default False """ cdef: shared_ptr[CFileMetaData] c_metadata @@ -1235,6 +1237,9 @@ cdef class ParquetReader(_Weakrefable): arrow_props.set_pre_buffer(pre_buffer) + print("open with page_checksum_verification: ", page_checksum_verification) + properties.set_page_checksum_verification(page_checksum_verification) + if coerce_int96_timestamp_unit is None: # use the default defined in default_arrow_reader_properties() pass @@ -1559,7 +1564,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( FileEncryptionProperties encryption_properties=None, write_batch_size=None, dictionary_pagesize_limit=None, - write_page_index=False) except *: + write_page_index=False, + page_checksum_enabled=False) except *: """General writer properties""" cdef: shared_ptr[WriterProperties] properties @@ -1703,6 +1709,13 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( # a size larger than this then it will be latched to this value. props.max_row_group_length(_MAX_ROW_GROUP_SIZE) + # checksum + + if page_checksum_enabled: + props.enable_page_checksum() + else: + props.disable_page_checksum() + # page index if write_page_index: @@ -1822,7 +1835,8 @@ cdef class ParquetWriter(_Weakrefable): write_batch_size=None, dictionary_pagesize_limit=None, store_schema=True, - write_page_index=False): + write_page_index=False, + page_checksum_enabled=False): cdef: shared_ptr[WriterProperties] properties shared_ptr[ArrowWriterProperties] arrow_properties @@ -1853,7 +1867,8 @@ cdef class ParquetWriter(_Weakrefable): encryption_properties=encryption_properties, write_batch_size=write_batch_size, dictionary_pagesize_limit=dictionary_pagesize_limit, - write_page_index=write_page_index + write_page_index=write_page_index, + page_checksum_enabled=page_checksum_enabled ) arrow_properties = _create_arrow_writer_properties( use_deprecated_int96_timestamps=use_deprecated_int96_timestamps, diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index 072ab7fa117..1e391e7c671 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -280,6 +280,8 @@ class ParquetFile: If nothing passed, will be inferred based on path. Path will try to be found in the local on-disk filesystem otherwise it will be parsed as an URI to determine the filesystem. + page_checksum_verification : bool, default False + If True, verify the checksum for each page read from the file. Examples -------- @@ -327,7 +329,8 @@ def __init__(self, source, *, metadata=None, common_metadata=None, read_dictionary=None, memory_map=False, buffer_size=0, pre_buffer=False, coerce_int96_timestamp_unit=None, decryption_properties=None, thrift_string_size_limit=None, - thrift_container_size_limit=None, filesystem=None): + thrift_container_size_limit=None, filesystem=None, + page_checksum_verification=False): self._close_source = getattr(source, 'closed', True) @@ -346,6 +349,7 @@ def __init__(self, source, *, metadata=None, common_metadata=None, decryption_properties=decryption_properties, thrift_string_size_limit=thrift_string_size_limit, thrift_container_size_limit=thrift_container_size_limit, + page_checksum_verification=page_checksum_verification, ) self.common_metadata = common_metadata self._nested_paths_by_prefix = self._build_nested_paths() @@ -887,6 +891,10 @@ def _sanitize_table(table, new_schema, flavor): filtering more efficient than the page header, as it gathers all the statistics for a Parquet file in a single place, avoiding scattered I/O. Note that the page index is not yet used on the read size by PyArrow. +page_checksum_enabled : bool, default False + Whether to write page checksums in general for all columns. + Page checksums enable detection of corruption, which might occur during + transmission or in the storage. """ _parquet_writer_example_doc = """\ @@ -980,6 +988,7 @@ def __init__(self, where, schema, filesystem=None, dictionary_pagesize_limit=None, store_schema=True, write_page_index=False, + page_checksum_enabled=False, **options): if use_deprecated_int96_timestamps is None: # Use int96 timestamps for Spark @@ -1037,6 +1046,7 @@ def __init__(self, where, schema, filesystem=None, dictionary_pagesize_limit=dictionary_pagesize_limit, store_schema=store_schema, write_page_index=write_page_index, + page_checksum_enabled=page_checksum_enabled, **options) self.is_open = True @@ -1766,6 +1776,8 @@ class ParquetDataset: If not None, override the maximum total size of containers allocated when decoding Thrift structures. The default limit should be sufficient for most Parquet files. +page_checksum_verification : bool, default False + If True, verify the page checksum for each page read from the file. Examples -------- @@ -1779,7 +1791,8 @@ def __new__(cls, path_or_paths=None, filesystem=None, schema=None, use_legacy_dataset=None, pre_buffer=True, coerce_int96_timestamp_unit=None, thrift_string_size_limit=None, - thrift_container_size_limit=None): + thrift_container_size_limit=None, + page_checksum_verification=False): extra_msg = "" if use_legacy_dataset is None: @@ -1812,6 +1825,7 @@ def __new__(cls, path_or_paths=None, filesystem=None, schema=None, metadata_nthreads=metadata_nthreads, thrift_string_size_limit=thrift_string_size_limit, thrift_container_size_limit=thrift_container_size_limit, + page_checksum_verification=page_checksum_verification, ) warnings.warn( "Passing 'use_legacy_dataset=True' to get the legacy behaviour is " @@ -1828,7 +1842,8 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, use_legacy_dataset=None, pre_buffer=True, coerce_int96_timestamp_unit=None, thrift_string_size_limit=None, - thrift_container_size_limit=None): + thrift_container_size_limit=None, + page_checksum_verification=False): if partitioning != "hive": raise ValueError( 'Only "hive" for hive-like partitioning is supported when ' @@ -2419,6 +2434,7 @@ def __init__(self, path_or_paths, filesystem=None, *, filters=None, coerce_int96_timestamp_unit=None, schema=None, decryption_properties=None, thrift_string_size_limit=None, thrift_container_size_limit=None, + page_checksum_verification=False, **kwargs): import pyarrow.dataset as ds @@ -2437,6 +2453,7 @@ def __init__(self, path_or_paths, filesystem=None, *, filters=None, "coerce_int96_timestamp_unit": coerce_int96_timestamp_unit, "thrift_string_size_limit": thrift_string_size_limit, "thrift_container_size_limit": thrift_container_size_limit, + "page_checksum_verification": page_checksum_verification, } if buffer_size: read_options.update(use_buffered_stream=True, @@ -2855,6 +2872,8 @@ def partitioning(self): If not None, override the maximum total size of containers allocated when decoding Thrift structures. The default limit should be sufficient for most Parquet files. +page_checksum_verification : bool, default False + If True, verify the checksum for each page read from the file. Returns ------- @@ -2949,7 +2968,8 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None, ignore_prefixes=None, pre_buffer=True, coerce_int96_timestamp_unit=None, decryption_properties=None, thrift_string_size_limit=None, - thrift_container_size_limit=None): + thrift_container_size_limit=None, + page_checksum_verification=False): if not use_legacy_dataset: if metadata is not None: raise ValueError( @@ -2973,6 +2993,7 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit, thrift_string_size_limit=thrift_string_size_limit, thrift_container_size_limit=thrift_container_size_limit, + page_checksum_verification=page_checksum_verification, ) except ImportError: # fall back on ParquetFile for simple cases when pyarrow.dataset @@ -3004,6 +3025,7 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None, decryption_properties=decryption_properties, thrift_string_size_limit=thrift_string_size_limit, thrift_container_size_limit=thrift_container_size_limit, + page_checksum_verification=page_checksum_verification, ) return dataset.read(columns=columns, use_threads=use_threads, @@ -3038,6 +3060,7 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None, partitioning=partitioning, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit, use_legacy_dataset=True, + page_checksum_verification=page_checksum_verification, ) else: pf = ParquetFile( @@ -3046,7 +3069,8 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None, memory_map=memory_map, buffer_size=buffer_size, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit, - decryption_properties=decryption_properties + decryption_properties=decryption_properties, + page_checksum_verification=page_checksum_verification, ) return pf.read(columns=columns, use_threads=use_threads, use_pandas_metadata=use_pandas_metadata) @@ -3101,6 +3125,7 @@ def write_table(table, where, row_group_size=None, version='2.6', dictionary_pagesize_limit=None, store_schema=True, write_page_index=False, + page_checksum_enabled=False, **kwargs): # Implementor's note: when adding keywords here / updating defaults, also # update it in write_to_dataset and _dataset_parquet.pyx ParquetFileWriteOptions @@ -3129,6 +3154,7 @@ def write_table(table, where, row_group_size=None, version='2.6', dictionary_pagesize_limit=dictionary_pagesize_limit, store_schema=store_schema, write_page_index=write_page_index, + page_checksum_enabled=page_checksum_enabled, **kwargs) as writer: writer.write_table(table, row_group_size=row_group_size) except Exception: diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py index dd12a266165..f5ed431752c 100644 --- a/python/pyarrow/tests/parquet/test_basic.py +++ b/python/pyarrow/tests/parquet/test_basic.py @@ -882,3 +882,40 @@ def test_thrift_size_limits(tempdir): assert got == table got = pq.read_table(path) assert got == table + + +def test_page_checksum_verification(tempdir): + # Write some sample data into a parquet file with page checksum enabled + original_path = tempdir / 'correct.parquet' + table_orig = pa.Table.from_pandas(pd.DataFrame( + data={'col1': [1, 1, 2]}, dtype=np.int8)) + pq.write_table(table_orig, original_path, page_checksum_enabled=True) + + # Read file and verify that the data is correct + table_check = pq.read_table(original_path, page_checksum_verification=True) + assert table_orig == table_check + + # Read the original file as binary and corrupt its 96-th bit. This should be + # equivalent to storing the following data: + # pa.Table.from_pandas(pd.DataFrame( + # data={'col1': [1, 2, 2]}, dtype=np.int8)) + bin_data = bytearray(original_path.read_bytes()) + assert bin_data[95] != 0x06 + bin_data[95] = 0x06 + + # Write the corrupted data to another parquet file + corrupted_path = tempdir / 'corrupted.parquet' + corrupted_path.write_bytes(bin_data) + + # Read the corrupted file without page checksum verification + table_bad = pq.read_table(corrupted_path, page_checksum_verification=False) + # The read should complete without error, but the table has different + # content than the original file! + assert table_bad != table_orig + assert table_bad == pa.Table.from_pandas( + pd.DataFrame(data={'col1': [1, 2, 2]}, dtype=np.int8)) + + # Read the corrupted file again, but with page checksum verification. This + # should raise an exception. + with pytest.raises(Exception): + _ = pq.read_table(corrupted_path, page_checksum_verification=True) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 6f3b54b0cd6..8b0b406afbb 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -788,12 +788,15 @@ def test_parquet_scan_options(): opts5 = ds.ParquetFragmentScanOptions( thrift_string_size_limit=123456, thrift_container_size_limit=987654,) + opts6 = ds.ParquetFragmentScanOptions( + page_checksum_verification=True) assert opts1.use_buffered_stream is False assert opts1.buffer_size == 2**13 assert opts1.pre_buffer is True assert opts1.thrift_string_size_limit == 100_000_000 # default in C++ assert opts1.thrift_container_size_limit == 1_000_000 # default in C++ + assert opts1.page_checksum_verification is False assert opts2.use_buffered_stream is False assert opts2.buffer_size == 2**12 @@ -810,11 +813,14 @@ def test_parquet_scan_options(): assert opts5.thrift_string_size_limit == 123456 assert opts5.thrift_container_size_limit == 987654 + assert opts6.page_checksum_verification is True + assert opts1 == opts1 assert opts1 != opts2 assert opts2 != opts3 assert opts3 != opts4 assert opts5 != opts1 + assert opts6 != opts1 def test_file_format_pickling(pickle_module):