From d54b22f00ec8937e53401501de54f2d93c0d048b Mon Sep 17 00:00:00 2001 From: Francesco Zardi Date: Tue, 14 Nov 2023 20:12:27 +0100 Subject: [PATCH 1/2] [Python][Parquet] Use temporary pytest directory for writing datasets --- python/pyarrow/tests/parquet/test_parquet_writer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/tests/parquet/test_parquet_writer.py b/python/pyarrow/tests/parquet/test_parquet_writer.py index 5e6895c8dc2..b902541015a 100644 --- a/python/pyarrow/tests/parquet/test_parquet_writer.py +++ b/python/pyarrow/tests/parquet/test_parquet_writer.py @@ -94,14 +94,14 @@ def test_validate_schema_write_table(tempdir): w.write_table(simple_table) -def test_parquet_invalid_writer(): +def test_parquet_invalid_writer(tempdir): # avoid segfaults with invalid construction with pytest.raises(TypeError): some_schema = pa.schema([pa.field("x", pa.int32())]) pq.ParquetWriter(None, some_schema) with pytest.raises(TypeError): - pq.ParquetWriter("some_path", None) + pq.ParquetWriter(tempdir / "some_path", None) @pytest.mark.pandas From b25c124316cf7e73256682ca335c1db1b3e14c14 Mon Sep 17 00:00:00 2001 From: Francesco Zardi Date: Thu, 19 Oct 2023 21:46:07 +0200 Subject: [PATCH 2/2] [Python][Parquet] Expose option to enable page checksum verification Co-authored-by: Alenka Frim Co-authored-by: Joris Van den Bossche Co-authored-by: mwish --- cpp/src/arrow/dataset/file_parquet.cc | 4 + python/pyarrow/_dataset_parquet.pyx | 22 +++- python/pyarrow/_parquet.pxd | 8 +- python/pyarrow/_parquet.pyx | 22 +++- python/pyarrow/parquet/core.py | 37 +++++- python/pyarrow/tests/parquet/test_basic.py | 132 +++++++++++++++++++++ python/pyarrow/tests/test_dataset.py | 80 +++++++++++++ 7 files changed, 293 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 65ad70181f2..3afe4ec85cf 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -98,6 +98,10 @@ parquet::ReaderProperties MakeReaderProperties( parquet_scan_options->reader_properties->thrift_string_size_limit()); properties.set_thrift_container_size_limit( parquet_scan_options->reader_properties->thrift_container_size_limit()); + + properties.set_page_checksum_verification( + parquet_scan_options->reader_properties->page_checksum_verification()); + return properties; } diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index 31aa058706a..bd4151624dc 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -606,6 +606,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions): write_batch_size=self._properties["write_batch_size"], dictionary_pagesize_limit=self._properties["dictionary_pagesize_limit"], write_page_index=self._properties["write_page_index"], + write_page_checksum=self._properties["write_page_checksum"], ) def _set_arrow_properties(self): @@ -655,6 +656,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions): dictionary_pagesize_limit=None, write_page_index=False, encryption_config=None, + write_page_checksum=False, ) self._set_properties() @@ -701,6 +703,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): decryption_config : pyarrow.dataset.ParquetDecryptionConfig, default None If not None, use the provided ParquetDecryptionConfig to decrypt the Parquet file. + page_checksum_verification : bool, default False + If True, verify the page checksum for each page read from the file. """ # Avoid mistakingly creating attributes @@ -711,7 +715,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): bint pre_buffer=True, thrift_string_size_limit=None, thrift_container_size_limit=None, - decryption_config=None): + decryption_config=None, + bint page_checksum_verification=False): self.init(shared_ptr[CFragmentScanOptions]( new CParquetFragmentScanOptions())) self.use_buffered_stream = use_buffered_stream @@ -723,6 +728,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): self.thrift_container_size_limit = thrift_container_size_limit if decryption_config is not None: self.parquet_decryption_config = decryption_config + self.page_checksum_verification = page_checksum_verification cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): FragmentScanOptions.init(self, sp) @@ -802,6 +808,14 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): set_decryption_config(self, config) self._parquet_decryption_config = config + @property + def page_checksum_verification(self): + return self.reader_properties().page_checksum_verification() + + @page_checksum_verification.setter + def page_checksum_verification(self, bint page_checksum_verification): + self.reader_properties().set_page_checksum_verification(page_checksum_verification) + def equals(self, ParquetFragmentScanOptions other): """ Parameters @@ -814,11 +828,12 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): """ attrs = ( self.use_buffered_stream, self.buffer_size, self.pre_buffer, - self.thrift_string_size_limit, self.thrift_container_size_limit) + self.thrift_string_size_limit, self.thrift_container_size_limit, + self.page_checksum_verification) other_attrs = ( other.use_buffered_stream, other.buffer_size, other.pre_buffer, other.thrift_string_size_limit, - other.thrift_container_size_limit) + other.thrift_container_size_limit, other.page_checksum_verification) return attrs == other_attrs @staticmethod @@ -835,6 +850,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): pre_buffer=self.pre_buffer, thrift_string_size_limit=self.thrift_string_size_limit, thrift_container_size_limit=self.thrift_container_size_limit, + page_checksum_verification=self.page_checksum_verification ) return ParquetFragmentScanOptions._reconstruct, (kwargs,) diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 39cdcc063b5..59b50ceda8c 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -380,6 +380,9 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: shared_ptr[CFileDecryptionProperties] file_decryption_properties() \ const + c_bool page_checksum_verification() const + void set_page_checksum_verification(c_bool check_crc) + CReaderProperties default_reader_properties() cdef cppclass ArrowReaderProperties: @@ -428,6 +431,8 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: Builder* dictionary_pagesize_limit(int64_t dictionary_pagesize_limit) Builder* enable_write_page_index() Builder* disable_write_page_index() + Builder* enable_page_checksum() + Builder* disable_page_checksum() shared_ptr[WriterProperties] build() cdef cppclass ArrowWriterProperties: @@ -576,7 +581,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( FileEncryptionProperties encryption_properties=*, write_batch_size=*, dictionary_pagesize_limit=*, - write_page_index=*) except * + write_page_index=*, + write_page_checksum=*) except * cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties( diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 48091367b2f..bce19b730a4 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -1182,7 +1182,8 @@ cdef class ParquetReader(_Weakrefable): coerce_int96_timestamp_unit=None, FileDecryptionProperties decryption_properties=None, thrift_string_size_limit=None, - thrift_container_size_limit=None): + thrift_container_size_limit=None, + page_checksum_verification=False): """ Open a parquet file for reading. @@ -1198,6 +1199,7 @@ cdef class ParquetReader(_Weakrefable): decryption_properties : FileDecryptionProperties, optional thrift_string_size_limit : int, optional thrift_container_size_limit : int, optional + page_checksum_verification : bool, default False """ cdef: shared_ptr[CFileMetaData] c_metadata @@ -1235,6 +1237,8 @@ cdef class ParquetReader(_Weakrefable): arrow_props.set_pre_buffer(pre_buffer) + properties.set_page_checksum_verification(page_checksum_verification) + if coerce_int96_timestamp_unit is None: # use the default defined in default_arrow_reader_properties() pass @@ -1559,7 +1563,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( FileEncryptionProperties encryption_properties=None, write_batch_size=None, dictionary_pagesize_limit=None, - write_page_index=False) except *: + write_page_index=False, + write_page_checksum=False) except *: """General writer properties""" cdef: shared_ptr[WriterProperties] properties @@ -1703,6 +1708,13 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( # a size larger than this then it will be latched to this value. props.max_row_group_length(_MAX_ROW_GROUP_SIZE) + # checksum + + if write_page_checksum: + props.enable_page_checksum() + else: + props.disable_page_checksum() + # page index if write_page_index: @@ -1822,7 +1834,8 @@ cdef class ParquetWriter(_Weakrefable): write_batch_size=None, dictionary_pagesize_limit=None, store_schema=True, - write_page_index=False): + write_page_index=False, + write_page_checksum=False): cdef: shared_ptr[WriterProperties] properties shared_ptr[ArrowWriterProperties] arrow_properties @@ -1853,7 +1866,8 @@ cdef class ParquetWriter(_Weakrefable): encryption_properties=encryption_properties, write_batch_size=write_batch_size, dictionary_pagesize_limit=dictionary_pagesize_limit, - write_page_index=write_page_index + write_page_index=write_page_index, + write_page_checksum=write_page_checksum ) arrow_properties = _create_arrow_writer_properties( use_deprecated_int96_timestamps=use_deprecated_int96_timestamps, diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index 072ab7fa117..096e9603847 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -280,6 +280,8 @@ class ParquetFile: If nothing passed, will be inferred based on path. Path will try to be found in the local on-disk filesystem otherwise it will be parsed as an URI to determine the filesystem. + page_checksum_verification : bool, default False + If True, verify the checksum for each page read from the file. Examples -------- @@ -327,7 +329,8 @@ def __init__(self, source, *, metadata=None, common_metadata=None, read_dictionary=None, memory_map=False, buffer_size=0, pre_buffer=False, coerce_int96_timestamp_unit=None, decryption_properties=None, thrift_string_size_limit=None, - thrift_container_size_limit=None, filesystem=None): + thrift_container_size_limit=None, filesystem=None, + page_checksum_verification=False): self._close_source = getattr(source, 'closed', True) @@ -346,6 +349,7 @@ def __init__(self, source, *, metadata=None, common_metadata=None, decryption_properties=decryption_properties, thrift_string_size_limit=thrift_string_size_limit, thrift_container_size_limit=thrift_container_size_limit, + page_checksum_verification=page_checksum_verification, ) self.common_metadata = common_metadata self._nested_paths_by_prefix = self._build_nested_paths() @@ -887,6 +891,10 @@ def _sanitize_table(table, new_schema, flavor): filtering more efficient than the page header, as it gathers all the statistics for a Parquet file in a single place, avoiding scattered I/O. Note that the page index is not yet used on the read size by PyArrow. +write_page_checksum : bool, default False + Whether to write page checksums in general for all columns. + Page checksums enable detection of data corruption, which might occur during + transmission or in the storage. """ _parquet_writer_example_doc = """\ @@ -980,6 +988,7 @@ def __init__(self, where, schema, filesystem=None, dictionary_pagesize_limit=None, store_schema=True, write_page_index=False, + write_page_checksum=False, **options): if use_deprecated_int96_timestamps is None: # Use int96 timestamps for Spark @@ -1037,6 +1046,7 @@ def __init__(self, where, schema, filesystem=None, dictionary_pagesize_limit=dictionary_pagesize_limit, store_schema=store_schema, write_page_index=write_page_index, + write_page_checksum=write_page_checksum, **options) self.is_open = True @@ -1766,6 +1776,8 @@ class ParquetDataset: If not None, override the maximum total size of containers allocated when decoding Thrift structures. The default limit should be sufficient for most Parquet files. +page_checksum_verification : bool, default False + If True, verify the page checksum for each page read from the file. Examples -------- @@ -1779,7 +1791,8 @@ def __new__(cls, path_or_paths=None, filesystem=None, schema=None, use_legacy_dataset=None, pre_buffer=True, coerce_int96_timestamp_unit=None, thrift_string_size_limit=None, - thrift_container_size_limit=None): + thrift_container_size_limit=None, + page_checksum_verification=False): extra_msg = "" if use_legacy_dataset is None: @@ -1812,6 +1825,7 @@ def __new__(cls, path_or_paths=None, filesystem=None, schema=None, metadata_nthreads=metadata_nthreads, thrift_string_size_limit=thrift_string_size_limit, thrift_container_size_limit=thrift_container_size_limit, + page_checksum_verification=page_checksum_verification, ) warnings.warn( "Passing 'use_legacy_dataset=True' to get the legacy behaviour is " @@ -1828,7 +1842,8 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, use_legacy_dataset=None, pre_buffer=True, coerce_int96_timestamp_unit=None, thrift_string_size_limit=None, - thrift_container_size_limit=None): + thrift_container_size_limit=None, + page_checksum_verification=False): if partitioning != "hive": raise ValueError( 'Only "hive" for hive-like partitioning is supported when ' @@ -2419,6 +2434,7 @@ def __init__(self, path_or_paths, filesystem=None, *, filters=None, coerce_int96_timestamp_unit=None, schema=None, decryption_properties=None, thrift_string_size_limit=None, thrift_container_size_limit=None, + page_checksum_verification=False, **kwargs): import pyarrow.dataset as ds @@ -2437,6 +2453,7 @@ def __init__(self, path_or_paths, filesystem=None, *, filters=None, "coerce_int96_timestamp_unit": coerce_int96_timestamp_unit, "thrift_string_size_limit": thrift_string_size_limit, "thrift_container_size_limit": thrift_container_size_limit, + "page_checksum_verification": page_checksum_verification, } if buffer_size: read_options.update(use_buffered_stream=True, @@ -2855,6 +2872,8 @@ def partitioning(self): If not None, override the maximum total size of containers allocated when decoding Thrift structures. The default limit should be sufficient for most Parquet files. +page_checksum_verification : bool, default False + If True, verify the checksum for each page read from the file. Returns ------- @@ -2949,7 +2968,8 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None, ignore_prefixes=None, pre_buffer=True, coerce_int96_timestamp_unit=None, decryption_properties=None, thrift_string_size_limit=None, - thrift_container_size_limit=None): + thrift_container_size_limit=None, + page_checksum_verification=False): if not use_legacy_dataset: if metadata is not None: raise ValueError( @@ -2973,6 +2993,7 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit, thrift_string_size_limit=thrift_string_size_limit, thrift_container_size_limit=thrift_container_size_limit, + page_checksum_verification=page_checksum_verification, ) except ImportError: # fall back on ParquetFile for simple cases when pyarrow.dataset @@ -3004,6 +3025,7 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None, decryption_properties=decryption_properties, thrift_string_size_limit=thrift_string_size_limit, thrift_container_size_limit=thrift_container_size_limit, + page_checksum_verification=page_checksum_verification, ) return dataset.read(columns=columns, use_threads=use_threads, @@ -3020,6 +3042,11 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None, "The 'ignore_prefixes' keyword is only supported when " "use_legacy_dataset=False") + if page_checksum_verification: + raise ValueError( + "The 'page_checksum_verification' keyword is only supported when " + "use_legacy_dataset=False") + if schema is not None: raise ValueError( "The 'schema' argument is only supported when " @@ -3101,6 +3128,7 @@ def write_table(table, where, row_group_size=None, version='2.6', dictionary_pagesize_limit=None, store_schema=True, write_page_index=False, + write_page_checksum=False, **kwargs): # Implementor's note: when adding keywords here / updating defaults, also # update it in write_to_dataset and _dataset_parquet.pyx ParquetFileWriteOptions @@ -3129,6 +3157,7 @@ def write_table(table, where, row_group_size=None, version='2.6', dictionary_pagesize_limit=dictionary_pagesize_limit, store_schema=store_schema, write_page_index=write_page_index, + write_page_checksum=write_page_checksum, **kwargs) as writer: writer.write_table(table, row_group_size=row_group_size) except Exception: diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py index dd12a266165..26c52b1cc59 100644 --- a/python/pyarrow/tests/parquet/test_basic.py +++ b/python/pyarrow/tests/parquet/test_basic.py @@ -18,6 +18,7 @@ from collections import OrderedDict import io import warnings +from shutil import copytree import numpy as np import pytest @@ -882,3 +883,134 @@ def test_thrift_size_limits(tempdir): assert got == table got = pq.read_table(path) assert got == table + + +def test_page_checksum_verification_write_table(tempdir): + """Check that checksum verification works for datasets created with + pq.write_table()""" + + # Write some sample data into a parquet file with page checksum enabled + original_path = tempdir / 'correct.parquet' + table_orig = pa.table({'a': [1, 2, 3, 4]}) + pq.write_table(table_orig, original_path, write_page_checksum=True) + + # Read file and verify that the data is correct + table_check = pq.read_table(original_path, page_checksum_verification=True) + assert table_orig == table_check + + # Read the original file as binary and swap the 31-th and 36-th bytes. This + # should be equivalent to storing the following data: + # pa.table({'a': [1, 3, 2, 4]}) + bin_data = bytearray(original_path.read_bytes()) + + # Swap two bytes to emulate corruption. Also, check that the two bytes are + # different, otherwise no corruption occurs + assert bin_data[31] != bin_data[36] + bin_data[31], bin_data[36] = bin_data[36], bin_data[31] + + # Write the corrupted data to another parquet file + corrupted_path = tempdir / 'corrupted.parquet' + corrupted_path.write_bytes(bin_data) + + # Case 1: Reading the corrupted file with read_table() and without page + # checksum verification succeeds but yields corrupted data + table_corrupt = pq.read_table(corrupted_path, + page_checksum_verification=False) + # The read should complete without error, but the table has different + # content than the original file! + assert table_corrupt != table_orig + assert table_corrupt == pa.table({'a': [1, 3, 2, 4]}) + + # Case 2: Reading the corrupted file with read_table() and with page + # checksum verification enabled raises an exception + with pytest.raises(OSError, match="CRC checksum verification"): + _ = pq.read_table(corrupted_path, page_checksum_verification=True) + + # Case 3: Reading the corrupted file with ParquetFile.read() and without + # page checksum verification succeeds but yields corrupted data + corrupted_pq_file = pq.ParquetFile(corrupted_path, + page_checksum_verification=False) + table_corrupt2 = corrupted_pq_file.read() + assert table_corrupt2 != table_orig + assert table_corrupt2 == pa.table({'a': [1, 3, 2, 4]}) + + # Case 4: Reading the corrupted file with ParquetFile.read() and with page + # checksum verification enabled raises an exception + corrupted_pq_file = pq.ParquetFile(corrupted_path, + page_checksum_verification=True) + # Accessing the data should result in an error + with pytest.raises(OSError, match="CRC checksum verification"): + _ = corrupted_pq_file.read() + + # Case 5: Check that enabling page checksum verification in combination + # with legacy dataset raises an exception + with pytest.raises(ValueError, match="page_checksum_verification"): + _ = pq.read_table(corrupted_path, + page_checksum_verification=True, + use_legacy_dataset=True) + + +@pytest.mark.dataset +@pytest.mark.parametrize( + "use_legacy_dataset", + [ + False, + pytest.param( + True, + marks=pytest.mark.filterwarnings( + "ignore:Passing 'use_legacy_dataset=True':FutureWarning" + ), + ), + ], +) +def test_checksum_write_to_dataset(tempdir, use_legacy_dataset): + """Check that checksum verification works for datasets created with + pq.write_to_dataset""" + + table_orig = pa.table({'a': [1, 2, 3, 4]}) + + # Write a sample dataset with page checksum enabled + original_dir_path = tempdir / 'correct_dir' + pq.write_to_dataset(table_orig, + original_dir_path, + write_page_checksum=True, + use_legacy_dataset=use_legacy_dataset) + + # Read file and verify that the data is correct + original_file_path_list = list(original_dir_path.iterdir()) + assert len(original_file_path_list) == 1 + original_path = original_file_path_list[0] + table_check = pq.read_table(original_path, page_checksum_verification=True) + assert table_orig == table_check + + # Read the original file as binary and swap the 31-th and 36-th bytes. This + # should be equivalent to storing the following data: + # pa.table({'a': [1, 3, 2, 4]}) + bin_data = bytearray(original_path.read_bytes()) + + # Swap two bytes to emulate corruption. Also, check that the two bytes are + # different, otherwise no corruption occurs + assert bin_data[31] != bin_data[36] + bin_data[31], bin_data[36] = bin_data[36], bin_data[31] + + # Write the corrupted data to another parquet dataset + # Copy dataset dir (which should be just one file) + corrupted_dir_path = tempdir / 'corrupted_dir' + copytree(original_dir_path, corrupted_dir_path) + # Corrupt just the one file with the dataset + corrupted_file_path = corrupted_dir_path / original_path.name + corrupted_file_path.write_bytes(bin_data) + + # Case 1: Reading the corrupted file with read_table() and without page + # checksum verification succeeds but yields corrupted data + table_corrupt = pq.read_table(corrupted_file_path, + page_checksum_verification=False) + # The read should complete without error, but the table has different + # content than the original file! + assert table_corrupt != table_orig + assert table_corrupt == pa.table({'a': [1, 3, 2, 4]}) + + # Case 2: Reading the corrupted file with read_table() and with page + # checksum verification enabled raises an exception + with pytest.raises(OSError, match="CRC checksum verification"): + _ = pq.read_table(corrupted_file_path, page_checksum_verification=True) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 6f3b54b0cd6..c6967326b36 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -25,6 +25,7 @@ import tempfile import threading import time +from shutil import copytree from urllib.parse import quote @@ -788,12 +789,15 @@ def test_parquet_scan_options(): opts5 = ds.ParquetFragmentScanOptions( thrift_string_size_limit=123456, thrift_container_size_limit=987654,) + opts6 = ds.ParquetFragmentScanOptions( + page_checksum_verification=True) assert opts1.use_buffered_stream is False assert opts1.buffer_size == 2**13 assert opts1.pre_buffer is True assert opts1.thrift_string_size_limit == 100_000_000 # default in C++ assert opts1.thrift_container_size_limit == 1_000_000 # default in C++ + assert opts1.page_checksum_verification is False assert opts2.use_buffered_stream is False assert opts2.buffer_size == 2**12 @@ -810,11 +814,14 @@ def test_parquet_scan_options(): assert opts5.thrift_string_size_limit == 123456 assert opts5.thrift_container_size_limit == 987654 + assert opts6.page_checksum_verification is True + assert opts1 == opts1 assert opts1 != opts2 assert opts2 != opts3 assert opts3 != opts4 assert opts5 != opts1 + assert opts6 != opts1 def test_file_format_pickling(pickle_module): @@ -5376,3 +5383,76 @@ def test_dataset_sort_by(tempdir, dstype): sorted_tab_dict = sorted_tab.to_table().to_pydict() assert sorted_tab_dict["a"] == [5, 7, 7, 35] assert sorted_tab_dict["b"] == ["foo", "car", "bar", "foobar"] + + +def test_checksum_write_dataset_read_dataset_to_table(tempdir): + """Check that checksum verification works for datasets created with + ds.write_dataset and read with ds.dataset.to_table""" + + table_orig = pa.table({'a': [1, 2, 3, 4]}) + + # Write a sample dataset with page checksum enabled + pq_write_format = pa.dataset.ParquetFileFormat() + write_options = pq_write_format.make_write_options( + write_page_checksum=True) + + original_dir_path = tempdir / 'correct_dir' + ds.write_dataset( + data=table_orig, + base_dir=original_dir_path, + format=pq_write_format, + file_options=write_options, + ) + + # Open dataset and verify that the data is correct + pq_scan_opts_crc = ds.ParquetFragmentScanOptions( + page_checksum_verification=True) + pq_read_format_crc = pa.dataset.ParquetFileFormat( + default_fragment_scan_options=pq_scan_opts_crc) + table_check = ds.dataset( + original_dir_path, + format=pq_read_format_crc + ).to_table() + assert table_orig == table_check + + # Copy dataset dir (which should be just one file) + corrupted_dir_path = tempdir / 'corrupted_dir' + copytree(original_dir_path, corrupted_dir_path) + + # Read the only file in the path as binary and swap the 31-th and 36-th + # bytes. This should be equivalent to storing the following data: + # pa.table({'a': [1, 3, 2, 4]}) + corrupted_file_path_list = list(corrupted_dir_path.iterdir()) + assert len(corrupted_file_path_list) == 1 + corrupted_file_path = corrupted_file_path_list[0] + bin_data = bytearray(corrupted_file_path.read_bytes()) + + # Swap two bytes to emulate corruption. Also, check that the two bytes are + # different, otherwise no corruption occurs + assert bin_data[31] != bin_data[36] + bin_data[31], bin_data[36] = bin_data[36], bin_data[31] + + # Write the corrupted data to the parquet file + corrupted_file_path.write_bytes(bin_data) + + # Case 1: Reading the corrupted file with dataset().to_table() and without + # page checksum verification succeeds but yields corrupted data + pq_scan_opts_no_crc = ds.ParquetFragmentScanOptions( + page_checksum_verification=False) + pq_read_format_no_crc = pa.dataset.ParquetFileFormat( + default_fragment_scan_options=pq_scan_opts_no_crc) + table_corrupt = ds.dataset( + corrupted_dir_path, format=pq_read_format_no_crc).to_table() + + # The read should complete without error, but the table has different + # content than the original file! + assert table_corrupt != table_orig + assert table_corrupt == pa.table({'a': [1, 3, 2, 4]}) + + # Case 2: Reading the corrupted file with read_table() and with page + # checksum verification enabled raises an exception + with pytest.raises(OSError, match="CRC checksum verification"): + _ = ds.dataset( + corrupted_dir_path, + format=pq_read_format_crc + ).to_table()