Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions python/pyarrow/_dataset_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
If not None, override the maximum total size of containers allocated
when decoding Thrift structures. The default limit should be
sufficient for most Parquet files.
page_checksum_verification : bool, default False
If True, verify the page checksum for each page read from the file.
"""

cdef:
Expand All @@ -682,7 +684,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
buffer_size=8192,
bint pre_buffer=False,
thrift_string_size_limit=None,
thrift_container_size_limit=None):
thrift_container_size_limit=None,
bint page_checksum_verification=False):
self.init(shared_ptr[CFragmentScanOptions](
new CParquetFragmentScanOptions()))
self.use_buffered_stream = use_buffered_stream
Expand All @@ -692,6 +695,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
self.thrift_string_size_limit = thrift_string_size_limit
if thrift_container_size_limit is not None:
self.thrift_container_size_limit = thrift_container_size_limit
self.page_checksum_verification = page_checksum_verification

cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
FragmentScanOptions.init(self, sp)
Expand Down Expand Up @@ -752,6 +756,14 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
raise ValueError("size must be larger than zero")
self.reader_properties().set_thrift_container_size_limit(size)

@property
def page_checksum_verification(self):
return self.reader_properties().page_checksum_verification()

@page_checksum_verification.setter
def page_checksum_verification(self, bint page_checksum_verification):
return self.reader_properties().page_checksum_verification(page_checksum_verification)

def equals(self, ParquetFragmentScanOptions other):
"""
Parameters
Expand All @@ -764,11 +776,11 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
"""
attrs = (
self.use_buffered_stream, self.buffer_size, self.pre_buffer,
self.thrift_string_size_limit, self.thrift_container_size_limit)
self.thrift_string_size_limit, self.thrift_container_size_limit, self.page_checksum_verification)
other_attrs = (
other.use_buffered_stream, other.buffer_size, other.pre_buffer,
other.thrift_string_size_limit,
other.thrift_container_size_limit)
other.thrift_container_size_limit, other.page_checksum_verification)
return attrs == other_attrs

@staticmethod
Expand All @@ -785,6 +797,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
pre_buffer=self.pre_buffer,
thrift_string_size_limit=self.thrift_string_size_limit,
thrift_container_size_limit=self.thrift_container_size_limit,
page_checksum_verification=self.page_checksum_verification
)
return ParquetFragmentScanOptions._reconstruct, (kwargs,)

Expand Down
5 changes: 5 additions & 0 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
shared_ptr[CFileDecryptionProperties] file_decryption_properties() \
const

c_bool page_checksum_verification() const
void set_page_checksum_verification(c_bool check_crc)

CReaderProperties default_reader_properties()

cdef cppclass ArrowReaderProperties:
Expand Down Expand Up @@ -428,6 +431,8 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
Builder* dictionary_pagesize_limit(int64_t dictionary_pagesize_limit)
Builder* enable_write_page_index()
Builder* disable_write_page_index()
Builder* enable_page_checksum()
Builder* disable_page_checksum()
shared_ptr[WriterProperties] build()

cdef cppclass ArrowWriterProperties:
Expand Down
21 changes: 17 additions & 4 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,8 @@ cdef class ParquetReader(_Weakrefable):
coerce_int96_timestamp_unit=None,
FileDecryptionProperties decryption_properties=None,
thrift_string_size_limit=None,
thrift_container_size_limit=None):
thrift_container_size_limit=None,
page_checksum_verification=False):
"""
Open a parquet file for reading.

Expand All @@ -1198,6 +1199,7 @@ cdef class ParquetReader(_Weakrefable):
decryption_properties : FileDecryptionProperties, optional
thrift_string_size_limit : int, optional
thrift_container_size_limit : int, optional
page_checksum_verification : bool, default False
"""
cdef:
shared_ptr[CFileMetaData] c_metadata
Expand Down Expand Up @@ -1235,6 +1237,8 @@ cdef class ParquetReader(_Weakrefable):

arrow_props.set_pre_buffer(pre_buffer)

properties.set_page_checksum_verification(page_checksum_verification)

if coerce_int96_timestamp_unit is None:
# use the default defined in default_arrow_reader_properties()
pass
Expand Down Expand Up @@ -1559,7 +1563,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
FileEncryptionProperties encryption_properties=None,
write_batch_size=None,
dictionary_pagesize_limit=None,
write_page_index=False) except *:
write_page_index=False,
page_checksum_enabled=False) except *:
"""General writer properties"""
cdef:
shared_ptr[WriterProperties] properties
Expand Down Expand Up @@ -1703,6 +1708,12 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
# a size larger than this then it will be latched to this value.
props.max_row_group_length(_MAX_ROW_GROUP_SIZE)

# checksum
if page_checksum_enabled:
props.enable_page_checksum()
else:
props.disable_page_checksum()

# page index

if write_page_index:
Expand Down Expand Up @@ -1822,7 +1833,8 @@ cdef class ParquetWriter(_Weakrefable):
write_batch_size=None,
dictionary_pagesize_limit=None,
store_schema=True,
write_page_index=False):
write_page_index=False,
page_checksum_enabled=False):
cdef:
shared_ptr[WriterProperties] properties
shared_ptr[ArrowWriterProperties] arrow_properties
Expand Down Expand Up @@ -1853,7 +1865,8 @@ cdef class ParquetWriter(_Weakrefable):
encryption_properties=encryption_properties,
write_batch_size=write_batch_size,
dictionary_pagesize_limit=dictionary_pagesize_limit,
write_page_index=write_page_index
write_page_index=write_page_index,
page_checksum_enabled=page_checksum_enabled
)
arrow_properties = _create_arrow_writer_properties(
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
Expand Down
27 changes: 23 additions & 4 deletions python/pyarrow/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ class ParquetFile:
If nothing passed, will be inferred based on path.
Path will try to be found in the local on-disk filesystem otherwise
it will be parsed as an URI to determine the filesystem.
page_checksum_verification : bool, default False
If True, verify the checksum for each page read from the file.

Examples
--------
Expand Down Expand Up @@ -327,7 +329,7 @@ def __init__(self, source, *, metadata=None, common_metadata=None,
read_dictionary=None, memory_map=False, buffer_size=0,
pre_buffer=False, coerce_int96_timestamp_unit=None,
decryption_properties=None, thrift_string_size_limit=None,
thrift_container_size_limit=None, filesystem=None):
thrift_container_size_limit=None, filesystem=None,page_checksum_verification=False):

self._close_source = getattr(source, 'closed', True)

Expand All @@ -346,6 +348,7 @@ def __init__(self, source, *, metadata=None, common_metadata=None,
decryption_properties=decryption_properties,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
page_checksum_verification=page_checksum_verification,
)
self.common_metadata = common_metadata
self._nested_paths_by_prefix = self._build_nested_paths()
Expand Down Expand Up @@ -881,6 +884,10 @@ def _sanitize_table(table, new_schema, flavor):
filtering more efficient than the page header, as it gathers all the
statistics for a Parquet file in a single place, avoiding scattered I/O.
Note that the page index is not yet used on the read size by PyArrow.
page_checksum_enabled: bool, default False
Whether to write page checksums in general for all columns.
File might be corrupted during the transmission or when it is stored in the
storage. Page checksums are used to detect the corruption.
"""

_parquet_writer_example_doc = """\
Expand Down Expand Up @@ -974,6 +981,7 @@ def __init__(self, where, schema, filesystem=None,
dictionary_pagesize_limit=None,
store_schema=True,
write_page_index=False,
page_checksum_enabled=False,
**options):
if use_deprecated_int96_timestamps is None:
# Use int96 timestamps for Spark
Expand Down Expand Up @@ -1031,6 +1039,7 @@ def __init__(self, where, schema, filesystem=None,
dictionary_pagesize_limit=dictionary_pagesize_limit,
store_schema=store_schema,
write_page_index=write_page_index,
page_checksum_enabled=page_checksum_enabled,
**options)
self.is_open = True

Expand Down Expand Up @@ -1759,6 +1768,8 @@ class ParquetDataset:
If not None, override the maximum total size of containers allocated
when decoding Thrift structures. The default limit should be
sufficient for most Parquet files.
page_checksum_verification : bool, default False
If True, verify the page checksum for each page read from the file.

Examples
--------
Expand All @@ -1772,7 +1783,8 @@ def __new__(cls, path_or_paths=None, filesystem=None, schema=None,
use_legacy_dataset=None, pre_buffer=True,
coerce_int96_timestamp_unit=None,
thrift_string_size_limit=None,
thrift_container_size_limit=None):
thrift_container_size_limit=None,
page_checksum_verification=False):

extra_msg = ""
if use_legacy_dataset is None:
Expand Down Expand Up @@ -1805,6 +1817,7 @@ def __new__(cls, path_or_paths=None, filesystem=None, schema=None,
metadata_nthreads=metadata_nthreads,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
page_checksum_verification=page_checksum_verification,
)
warnings.warn(
"Passing 'use_legacy_dataset=True' to get the legacy behaviour is "
Expand All @@ -1821,7 +1834,8 @@ def __init__(self, path_or_paths, filesystem=None, schema=None,
use_legacy_dataset=None, pre_buffer=True,
coerce_int96_timestamp_unit=None,
thrift_string_size_limit=None,
thrift_container_size_limit=None):
thrift_container_size_limit=None,
page_checksum_verification=False):
if partitioning != "hive":
raise ValueError(
'Only "hive" for hive-like partitioning is supported when '
Expand Down Expand Up @@ -2412,6 +2426,7 @@ def __init__(self, path_or_paths, filesystem=None, *, filters=None,
coerce_int96_timestamp_unit=None, schema=None,
decryption_properties=None, thrift_string_size_limit=None,
thrift_container_size_limit=None,
page_checksum_verification=False,
**kwargs):
import pyarrow.dataset as ds

Expand All @@ -2430,6 +2445,7 @@ def __init__(self, path_or_paths, filesystem=None, *, filters=None,
"coerce_int96_timestamp_unit": coerce_int96_timestamp_unit,
"thrift_string_size_limit": thrift_string_size_limit,
"thrift_container_size_limit": thrift_container_size_limit,
"page_checksum_verification": page_checksum_verification,
}
if buffer_size:
read_options.update(use_buffered_stream=True,
Expand Down Expand Up @@ -2942,7 +2958,8 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None,
ignore_prefixes=None, pre_buffer=True,
coerce_int96_timestamp_unit=None,
decryption_properties=None, thrift_string_size_limit=None,
thrift_container_size_limit=None):
thrift_container_size_limit=None,
page_checksum_verification=False):
if not use_legacy_dataset:
if metadata is not None:
raise ValueError(
Expand All @@ -2966,6 +2983,7 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
page_checksum_verification=page_checksum_verification,
)
except ImportError:
# fall back on ParquetFile for simple cases when pyarrow.dataset
Expand Down Expand Up @@ -2997,6 +3015,7 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None,
decryption_properties=decryption_properties,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
page_checksum_verification=page_checksum_verification,
)

return dataset.read(columns=columns, use_threads=use_threads,
Expand Down
6 changes: 6 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,12 +788,15 @@ def test_parquet_scan_options():
opts5 = ds.ParquetFragmentScanOptions(
thrift_string_size_limit=123456,
thrift_container_size_limit=987654,)
opt6 = ds.ParquetFragmentScanOptions(
page_checksum_verification=True)

assert opts1.use_buffered_stream is False
assert opts1.buffer_size == 2**13
assert opts1.pre_buffer is False
assert opts1.thrift_string_size_limit == 100_000_000 # default in C++
assert opts1.thrift_container_size_limit == 1_000_000 # default in C++
assert opts1.page_checksum_verification is False

assert opts2.use_buffered_stream is False
assert opts2.buffer_size == 2**12
Expand All @@ -810,11 +813,14 @@ def test_parquet_scan_options():
assert opts5.thrift_string_size_limit == 123456
assert opts5.thrift_container_size_limit == 987654

assert opts6.page_checksum_verification is True

assert opts1 == opts1
assert opts1 != opts2
assert opts2 != opts3
assert opts3 != opts4
assert opts5 != opts1
assert opts6 != opts1


def test_file_format_pickling(pickle_module):
Expand Down