Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ cdef class ParquetReader(_Weakrefable):

def open(self, object source not None, bint use_memory_map=True,
read_dictionary=None, FileMetaData metadata=None,
int buffer_size=0):
int buffer_size=0, bint pre_buffer=False):
cdef:
shared_ptr[CRandomAccessFile] rd_handle
shared_ptr[CFileMetaData] c_metadata
Expand All @@ -950,6 +950,8 @@ cdef class ParquetReader(_Weakrefable):
else:
raise ValueError('Buffer size must be larger than zero')

arrow_props.set_pre_buffer(pre_buffer)

self.source = source

get_reader(source, use_memory_map, &rd_handle)
Expand Down
39 changes: 31 additions & 8 deletions python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,18 @@ class ParquetFile:
buffer_size : int, default 0
If positive, perform read buffering when deserializing individual
column chunks. Otherwise IO calls are unbuffered.
pre_buffer : bool, default False
Coalesce and issue file reads in parallel to improve performance on
high-latency filesystems (e.g. S3). If True, Arrow will use a
background I/O thread pool.
"""

def __init__(self, source, metadata=None, common_metadata=None,
read_dictionary=None, memory_map=False, buffer_size=0):
read_dictionary=None, memory_map=False, buffer_size=0,
pre_buffer=False):
self.reader = ParquetReader()
self.reader.open(source, use_memory_map=memory_map,
buffer_size=buffer_size,
buffer_size=buffer_size, pre_buffer=pre_buffer,
read_dictionary=read_dictionary, metadata=metadata)
self.common_metadata = common_metadata
self._nested_paths_by_prefix = self._build_nested_paths()
Expand Down Expand Up @@ -1212,13 +1217,20 @@ class ParquetDataset:
new Arrow Dataset API). Among other things, this allows to pass
`filters` for all columns and not only the partition keys, enables
different partitioning schemes, etc.
pre_buffer : bool, default True
Coalesce and issue file reads in parallel to improve performance on
high-latency filesystems (e.g. S3). If True, Arrow will use a
background I/O thread pool. This option is only supported for
use_legacy_dataset=False. If using a filesystem layer that itself
performs readahead (e.g. fsspec's S3FS), disable readahead for best
results.
""".format(_read_docstring_common, _DNF_filter_doc)

def __new__(cls, path_or_paths=None, filesystem=None, schema=None,
metadata=None, split_row_groups=False, validate_schema=True,
filters=None, metadata_nthreads=1, read_dictionary=None,
memory_map=False, buffer_size=0, partitioning="hive",
use_legacy_dataset=None):
use_legacy_dataset=None, pre_buffer=True):
if use_legacy_dataset is None:
# if a new filesystem is passed -> default to new implementation
if isinstance(filesystem, FileSystem):
Expand All @@ -1234,6 +1246,7 @@ def __new__(cls, path_or_paths=None, filesystem=None, schema=None,
read_dictionary=read_dictionary,
memory_map=memory_map,
buffer_size=buffer_size,
pre_buffer=pre_buffer,
# unsupported keywords
schema=schema, metadata=metadata,
split_row_groups=split_row_groups,
Expand All @@ -1246,7 +1259,7 @@ def __init__(self, path_or_paths, filesystem=None, schema=None,
metadata=None, split_row_groups=False, validate_schema=True,
filters=None, metadata_nthreads=1, read_dictionary=None,
memory_map=False, buffer_size=0, partitioning="hive",
use_legacy_dataset=True):
use_legacy_dataset=True, pre_buffer=True):
if partitioning != "hive":
raise ValueError(
'Only "hive" for hive-like partitioning is supported when '
Expand Down Expand Up @@ -1480,7 +1493,8 @@ class _ParquetDatasetV2:

def __init__(self, path_or_paths, filesystem=None, filters=None,
partitioning="hive", read_dictionary=None, buffer_size=None,
memory_map=False, ignore_prefixes=None, **kwargs):
memory_map=False, ignore_prefixes=None, pre_buffer=True,
**kwargs):
import pyarrow.dataset as ds

# Raise error for not supported keywords
Expand All @@ -1494,7 +1508,7 @@ def __init__(self, path_or_paths, filesystem=None, filters=None,
"Dataset API".format(keyword))

# map format arguments
read_options = {}
read_options = {"pre_buffer": pre_buffer}
if buffer_size:
read_options.update(use_buffered_stream=True,
buffer_size=buffer_size)
Expand Down Expand Up @@ -1676,6 +1690,13 @@ def pieces(self):
keys and only a hive-style directory structure is supported. When
setting `use_legacy_dataset` to False, also within-file level filtering
and different partitioning schemes are supported.
pre_buffer : bool, default True
Coalesce and issue file reads in parallel to improve performance on
high-latency filesystems (e.g. S3). If True, Arrow will use a
background I/O thread pool. This option is only supported for
use_legacy_dataset=False. If using a filesystem layer that itself
performs readahead (e.g. fsspec's S3FS), disable readahead for best
results.

{3}

Expand All @@ -1689,7 +1710,7 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
use_pandas_metadata=False, memory_map=False,
read_dictionary=None, filesystem=None, filters=None,
buffer_size=0, partitioning="hive", use_legacy_dataset=False,
ignore_prefixes=None):
ignore_prefixes=None, pre_buffer=True):
if not use_legacy_dataset:
if metadata is not None:
raise ValueError(
Expand All @@ -1708,6 +1729,7 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
buffer_size=buffer_size,
filters=filters,
ignore_prefixes=ignore_prefixes,
pre_buffer=pre_buffer,
)
except ImportError:
# fall back on ParquetFile for simple cases when pyarrow.dataset
Expand All @@ -1728,7 +1750,8 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
# TODO test that source is not a directory or a list
dataset = ParquetFile(
source, metadata=metadata, read_dictionary=read_dictionary,
memory_map=memory_map, buffer_size=buffer_size)
memory_map=memory_map, buffer_size=buffer_size,
pre_buffer=pre_buffer)

return dataset.read(columns=columns, use_threads=use_threads,
use_pandas_metadata=use_pandas_metadata)
Expand Down
21 changes: 21 additions & 0 deletions python/pyarrow/tests/parquet/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,27 @@ def test_dataset_enable_buffered_stream(tempdir, use_legacy_dataset):
assert dataset.read().equals(table)


@pytest.mark.pandas
@parametrize_legacy_dataset
def test_dataset_enable_pre_buffer(tempdir, use_legacy_dataset):
dirpath = tempdir / guid()
dirpath.mkdir()

df = _test_dataframe(10, seed=0)
path = dirpath / '{}.parquet'.format(0)
table = pa.Table.from_pandas(df)
_write_table(table, path, version='2.0')

for pre_buffer in (True, False):
dataset = pq.ParquetDataset(
dirpath, pre_buffer=pre_buffer,
use_legacy_dataset=use_legacy_dataset)
assert dataset.read().equals(table)
actual = pq.read_table(dirpath, pre_buffer=pre_buffer,
use_legacy_dataset=use_legacy_dataset)
assert actual.equals(table)


def _make_example_multifile_dataset(base_path, nfiles=10, file_nrows=5):
test_data = []
paths = []
Expand Down
16 changes: 16 additions & 0 deletions python/pyarrow/tests/parquet/test_parquet_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,19 @@ def get_all_batches(f):
)

batch_no += 1


@pytest.mark.pandas
@pytest.mark.parametrize('pre_buffer', [False, True])
def test_pre_buffer(pre_buffer):
N, K = 10000, 4
df = alltypes_sample(size=N)
a_table = pa.Table.from_pandas(df)

buf = io.BytesIO()
_write_table(a_table, buf, row_group_size=N / K,
compression='snappy', version='2.0')

buf.seek(0)
pf = pq.ParquetFile(buf, pre_buffer=pre_buffer)
assert pf.read().num_rows == N
1 change: 1 addition & 0 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def run(self):
('with-flight', None, 'build the Flight extension'),
('with-dataset', None, 'build the Dataset extension'),
('with-parquet', None, 'build the Parquet extension'),
('with-s3', None, 'build the Amazon S3 extension'),
('with-static-parquet', None, 'link parquet statically'),
('with-static-boost', None, 'link boost statically'),
('with-plasma', None, 'build the Plasma extension'),
Expand Down