Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ parquet::ReaderProperties MakeReaderProperties(
parquet_scan_options->reader_properties->thrift_string_size_limit());
properties.set_thrift_container_size_limit(
parquet_scan_options->reader_properties->thrift_container_size_limit());

properties.set_page_checksum_verification(
parquet_scan_options->reader_properties->page_checksum_verification());

return properties;
}

Expand Down
22 changes: 19 additions & 3 deletions python/pyarrow/_dataset_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
write_batch_size=self._properties["write_batch_size"],
dictionary_pagesize_limit=self._properties["dictionary_pagesize_limit"],
write_page_index=self._properties["write_page_index"],
write_page_checksum=self._properties["write_page_checksum"],
)

def _set_arrow_properties(self):
Expand Down Expand Up @@ -655,6 +656,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
dictionary_pagesize_limit=None,
write_page_index=False,
encryption_config=None,
write_page_checksum=False,
)

self._set_properties()
Expand Down Expand Up @@ -701,6 +703,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
decryption_config : pyarrow.dataset.ParquetDecryptionConfig, default None
If not None, use the provided ParquetDecryptionConfig to decrypt the
Parquet file.
page_checksum_verification : bool, default False
If True, verify the page checksum for each page read from the file.
"""

# Avoid mistakingly creating attributes
Expand All @@ -711,7 +715,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
bint pre_buffer=True,
thrift_string_size_limit=None,
thrift_container_size_limit=None,
decryption_config=None):
decryption_config=None,
bint page_checksum_verification=False):
self.init(shared_ptr[CFragmentScanOptions](
new CParquetFragmentScanOptions()))
self.use_buffered_stream = use_buffered_stream
Expand All @@ -723,6 +728,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
self.thrift_container_size_limit = thrift_container_size_limit
if decryption_config is not None:
self.parquet_decryption_config = decryption_config
self.page_checksum_verification = page_checksum_verification

cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
FragmentScanOptions.init(self, sp)
Expand Down Expand Up @@ -802,6 +808,14 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
set_decryption_config(self, config)
self._parquet_decryption_config = config

@property
def page_checksum_verification(self):
return self.reader_properties().page_checksum_verification()

@page_checksum_verification.setter
def page_checksum_verification(self, bint page_checksum_verification):
self.reader_properties().set_page_checksum_verification(page_checksum_verification)

def equals(self, ParquetFragmentScanOptions other):
"""
Parameters
Expand All @@ -814,11 +828,12 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
"""
attrs = (
self.use_buffered_stream, self.buffer_size, self.pre_buffer,
self.thrift_string_size_limit, self.thrift_container_size_limit)
self.thrift_string_size_limit, self.thrift_container_size_limit,
self.page_checksum_verification)
other_attrs = (
other.use_buffered_stream, other.buffer_size, other.pre_buffer,
other.thrift_string_size_limit,
other.thrift_container_size_limit)
other.thrift_container_size_limit, other.page_checksum_verification)
return attrs == other_attrs

@staticmethod
Expand All @@ -835,6 +850,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
pre_buffer=self.pre_buffer,
thrift_string_size_limit=self.thrift_string_size_limit,
thrift_container_size_limit=self.thrift_container_size_limit,
page_checksum_verification=self.page_checksum_verification
)
return ParquetFragmentScanOptions._reconstruct, (kwargs,)

Expand Down
8 changes: 7 additions & 1 deletion python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
shared_ptr[CFileDecryptionProperties] file_decryption_properties() \
const

c_bool page_checksum_verification() const
void set_page_checksum_verification(c_bool check_crc)

CReaderProperties default_reader_properties()

cdef cppclass ArrowReaderProperties:
Expand Down Expand Up @@ -428,6 +431,8 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
Builder* dictionary_pagesize_limit(int64_t dictionary_pagesize_limit)
Builder* enable_write_page_index()
Builder* disable_write_page_index()
Builder* enable_page_checksum()
Builder* disable_page_checksum()
shared_ptr[WriterProperties] build()

cdef cppclass ArrowWriterProperties:
Expand Down Expand Up @@ -576,7 +581,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
FileEncryptionProperties encryption_properties=*,
write_batch_size=*,
dictionary_pagesize_limit=*,
write_page_index=*) except *
write_page_index=*,
write_page_checksum=*) except *


cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties(
Expand Down
22 changes: 18 additions & 4 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,8 @@ cdef class ParquetReader(_Weakrefable):
coerce_int96_timestamp_unit=None,
FileDecryptionProperties decryption_properties=None,
thrift_string_size_limit=None,
thrift_container_size_limit=None):
thrift_container_size_limit=None,
page_checksum_verification=False):
"""
Open a parquet file for reading.

Expand All @@ -1198,6 +1199,7 @@ cdef class ParquetReader(_Weakrefable):
decryption_properties : FileDecryptionProperties, optional
thrift_string_size_limit : int, optional
thrift_container_size_limit : int, optional
page_checksum_verification : bool, default False
"""
cdef:
shared_ptr[CFileMetaData] c_metadata
Expand Down Expand Up @@ -1235,6 +1237,8 @@ cdef class ParquetReader(_Weakrefable):

arrow_props.set_pre_buffer(pre_buffer)

properties.set_page_checksum_verification(page_checksum_verification)

if coerce_int96_timestamp_unit is None:
# use the default defined in default_arrow_reader_properties()
pass
Expand Down Expand Up @@ -1559,7 +1563,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
FileEncryptionProperties encryption_properties=None,
write_batch_size=None,
dictionary_pagesize_limit=None,
write_page_index=False) except *:
write_page_index=False,
write_page_checksum=False) except *:
"""General writer properties"""
cdef:
shared_ptr[WriterProperties] properties
Expand Down Expand Up @@ -1703,6 +1708,13 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
# a size larger than this then it will be latched to this value.
props.max_row_group_length(_MAX_ROW_GROUP_SIZE)

# checksum

if write_page_checksum:
props.enable_page_checksum()
else:
props.disable_page_checksum()

# page index

if write_page_index:
Expand Down Expand Up @@ -1822,7 +1834,8 @@ cdef class ParquetWriter(_Weakrefable):
write_batch_size=None,
dictionary_pagesize_limit=None,
store_schema=True,
write_page_index=False):
write_page_index=False,
write_page_checksum=False):
cdef:
shared_ptr[WriterProperties] properties
shared_ptr[ArrowWriterProperties] arrow_properties
Expand Down Expand Up @@ -1853,7 +1866,8 @@ cdef class ParquetWriter(_Weakrefable):
encryption_properties=encryption_properties,
write_batch_size=write_batch_size,
dictionary_pagesize_limit=dictionary_pagesize_limit,
write_page_index=write_page_index
write_page_index=write_page_index,
write_page_checksum=write_page_checksum
)
arrow_properties = _create_arrow_writer_properties(
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
Expand Down
37 changes: 33 additions & 4 deletions python/pyarrow/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ class ParquetFile:
If nothing passed, will be inferred based on path.
Path will try to be found in the local on-disk filesystem otherwise
it will be parsed as an URI to determine the filesystem.
page_checksum_verification : bool, default False
If True, verify the checksum for each page read from the file.

Examples
--------
Expand Down Expand Up @@ -327,7 +329,8 @@ def __init__(self, source, *, metadata=None, common_metadata=None,
read_dictionary=None, memory_map=False, buffer_size=0,
pre_buffer=False, coerce_int96_timestamp_unit=None,
decryption_properties=None, thrift_string_size_limit=None,
thrift_container_size_limit=None, filesystem=None):
thrift_container_size_limit=None, filesystem=None,
page_checksum_verification=False):

self._close_source = getattr(source, 'closed', True)

Expand All @@ -346,6 +349,7 @@ def __init__(self, source, *, metadata=None, common_metadata=None,
decryption_properties=decryption_properties,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
page_checksum_verification=page_checksum_verification,
)
self.common_metadata = common_metadata
self._nested_paths_by_prefix = self._build_nested_paths()
Expand Down Expand Up @@ -887,6 +891,10 @@ def _sanitize_table(table, new_schema, flavor):
filtering more efficient than the page header, as it gathers all the
statistics for a Parquet file in a single place, avoiding scattered I/O.
Note that the page index is not yet used on the read size by PyArrow.
write_page_checksum : bool, default False
Whether to write page checksums in general for all columns.
Page checksums enable detection of data corruption, which might occur during
transmission or in the storage.
"""

_parquet_writer_example_doc = """\
Expand Down Expand Up @@ -980,6 +988,7 @@ def __init__(self, where, schema, filesystem=None,
dictionary_pagesize_limit=None,
store_schema=True,
write_page_index=False,
write_page_checksum=False,
**options):
if use_deprecated_int96_timestamps is None:
# Use int96 timestamps for Spark
Expand Down Expand Up @@ -1037,6 +1046,7 @@ def __init__(self, where, schema, filesystem=None,
dictionary_pagesize_limit=dictionary_pagesize_limit,
store_schema=store_schema,
write_page_index=write_page_index,
write_page_checksum=write_page_checksum,
**options)
self.is_open = True

Expand Down Expand Up @@ -1766,6 +1776,8 @@ class ParquetDataset:
If not None, override the maximum total size of containers allocated
when decoding Thrift structures. The default limit should be
sufficient for most Parquet files.
page_checksum_verification : bool, default False
If True, verify the page checksum for each page read from the file.

Examples
--------
Expand All @@ -1779,7 +1791,8 @@ def __new__(cls, path_or_paths=None, filesystem=None, schema=None,
use_legacy_dataset=None, pre_buffer=True,
coerce_int96_timestamp_unit=None,
thrift_string_size_limit=None,
thrift_container_size_limit=None):
thrift_container_size_limit=None,
page_checksum_verification=False):

extra_msg = ""
if use_legacy_dataset is None:
Expand Down Expand Up @@ -1812,6 +1825,7 @@ def __new__(cls, path_or_paths=None, filesystem=None, schema=None,
metadata_nthreads=metadata_nthreads,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
page_checksum_verification=page_checksum_verification,
)
warnings.warn(
"Passing 'use_legacy_dataset=True' to get the legacy behaviour is "
Expand All @@ -1828,7 +1842,8 @@ def __init__(self, path_or_paths, filesystem=None, schema=None,
use_legacy_dataset=None, pre_buffer=True,
coerce_int96_timestamp_unit=None,
thrift_string_size_limit=None,
thrift_container_size_limit=None):
thrift_container_size_limit=None,
page_checksum_verification=False):
if partitioning != "hive":
raise ValueError(
'Only "hive" for hive-like partitioning is supported when '
Expand Down Expand Up @@ -2419,6 +2434,7 @@ def __init__(self, path_or_paths, filesystem=None, *, filters=None,
coerce_int96_timestamp_unit=None, schema=None,
decryption_properties=None, thrift_string_size_limit=None,
thrift_container_size_limit=None,
page_checksum_verification=False,
**kwargs):
import pyarrow.dataset as ds

Expand All @@ -2437,6 +2453,7 @@ def __init__(self, path_or_paths, filesystem=None, *, filters=None,
"coerce_int96_timestamp_unit": coerce_int96_timestamp_unit,
"thrift_string_size_limit": thrift_string_size_limit,
"thrift_container_size_limit": thrift_container_size_limit,
"page_checksum_verification": page_checksum_verification,
}
if buffer_size:
read_options.update(use_buffered_stream=True,
Expand Down Expand Up @@ -2855,6 +2872,8 @@ def partitioning(self):
If not None, override the maximum total size of containers allocated
when decoding Thrift structures. The default limit should be
sufficient for most Parquet files.
page_checksum_verification : bool, default False
If True, verify the checksum for each page read from the file.

Returns
-------
Expand Down Expand Up @@ -2949,7 +2968,8 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None,
ignore_prefixes=None, pre_buffer=True,
coerce_int96_timestamp_unit=None,
decryption_properties=None, thrift_string_size_limit=None,
thrift_container_size_limit=None):
thrift_container_size_limit=None,
page_checksum_verification=False):
if not use_legacy_dataset:
if metadata is not None:
raise ValueError(
Expand All @@ -2973,6 +2993,7 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
page_checksum_verification=page_checksum_verification,
)
except ImportError:
# fall back on ParquetFile for simple cases when pyarrow.dataset
Expand Down Expand Up @@ -3004,6 +3025,7 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None,
decryption_properties=decryption_properties,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
page_checksum_verification=page_checksum_verification,
)

return dataset.read(columns=columns, use_threads=use_threads,
Expand All @@ -3020,6 +3042,11 @@ def read_table(source, *, columns=None, use_threads=True, metadata=None,
"The 'ignore_prefixes' keyword is only supported when "
"use_legacy_dataset=False")

if page_checksum_verification:
raise ValueError(
"The 'page_checksum_verification' keyword is only supported when "
"use_legacy_dataset=False")

if schema is not None:
raise ValueError(
"The 'schema' argument is only supported when "
Expand Down Expand Up @@ -3101,6 +3128,7 @@ def write_table(table, where, row_group_size=None, version='2.6',
dictionary_pagesize_limit=None,
store_schema=True,
write_page_index=False,
write_page_checksum=False,
**kwargs):
# Implementor's note: when adding keywords here / updating defaults, also
# update it in write_to_dataset and _dataset_parquet.pyx ParquetFileWriteOptions
Expand Down Expand Up @@ -3129,6 +3157,7 @@ def write_table(table, where, row_group_size=None, version='2.6',
dictionary_pagesize_limit=dictionary_pagesize_limit,
store_schema=store_schema,
write_page_index=write_page_index,
write_page_checksum=write_page_checksum,
**kwargs) as writer:
writer.write_table(table, row_group_size=row_group_size)
except Exception:
Expand Down
Loading