diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 4b435ba1d1c..0b66ea0e0b0 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -929,7 +929,7 @@ cdef class ParquetReader(_Weakrefable): def open(self, object source not None, bint use_memory_map=True, read_dictionary=None, FileMetaData metadata=None, - int buffer_size=0): + int buffer_size=0, bint pre_buffer=False): cdef: shared_ptr[CRandomAccessFile] rd_handle shared_ptr[CFileMetaData] c_metadata @@ -950,6 +950,8 @@ cdef class ParquetReader(_Weakrefable): else: raise ValueError('Buffer size must be larger than zero') + arrow_props.set_pre_buffer(pre_buffer) + self.source = source get_reader(source, use_memory_map, &rd_handle) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 88683d95013..97e431c4179 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -209,13 +209,18 @@ class ParquetFile: buffer_size : int, default 0 If positive, perform read buffering when deserializing individual column chunks. Otherwise IO calls are unbuffered. + pre_buffer : bool, default False + Coalesce and issue file reads in parallel to improve performance on + high-latency filesystems (e.g. S3). If True, Arrow will use a + background I/O thread pool. """ def __init__(self, source, metadata=None, common_metadata=None, - read_dictionary=None, memory_map=False, buffer_size=0): + read_dictionary=None, memory_map=False, buffer_size=0, + pre_buffer=False): self.reader = ParquetReader() self.reader.open(source, use_memory_map=memory_map, - buffer_size=buffer_size, + buffer_size=buffer_size, pre_buffer=pre_buffer, read_dictionary=read_dictionary, metadata=metadata) self.common_metadata = common_metadata self._nested_paths_by_prefix = self._build_nested_paths() @@ -1212,13 +1217,20 @@ class ParquetDataset: new Arrow Dataset API). Among other things, this allows to pass `filters` for all columns and not only the partition keys, enables different partitioning schemes, etc. +pre_buffer : bool, default True + Coalesce and issue file reads in parallel to improve performance on + high-latency filesystems (e.g. S3). If True, Arrow will use a + background I/O thread pool. This option is only supported for + use_legacy_dataset=False. If using a filesystem layer that itself + performs readahead (e.g. fsspec's S3FS), disable readahead for best + results. """.format(_read_docstring_common, _DNF_filter_doc) def __new__(cls, path_or_paths=None, filesystem=None, schema=None, metadata=None, split_row_groups=False, validate_schema=True, filters=None, metadata_nthreads=1, read_dictionary=None, memory_map=False, buffer_size=0, partitioning="hive", - use_legacy_dataset=None): + use_legacy_dataset=None, pre_buffer=True): if use_legacy_dataset is None: # if a new filesystem is passed -> default to new implementation if isinstance(filesystem, FileSystem): @@ -1234,6 +1246,7 @@ def __new__(cls, path_or_paths=None, filesystem=None, schema=None, read_dictionary=read_dictionary, memory_map=memory_map, buffer_size=buffer_size, + pre_buffer=pre_buffer, # unsupported keywords schema=schema, metadata=metadata, split_row_groups=split_row_groups, @@ -1246,7 +1259,7 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, metadata=None, split_row_groups=False, validate_schema=True, filters=None, metadata_nthreads=1, read_dictionary=None, memory_map=False, buffer_size=0, partitioning="hive", - use_legacy_dataset=True): + use_legacy_dataset=True, pre_buffer=True): if partitioning != "hive": raise ValueError( 'Only "hive" for hive-like partitioning is supported when ' @@ -1480,7 +1493,8 @@ class _ParquetDatasetV2: def __init__(self, path_or_paths, filesystem=None, filters=None, partitioning="hive", read_dictionary=None, buffer_size=None, - memory_map=False, ignore_prefixes=None, **kwargs): + memory_map=False, ignore_prefixes=None, pre_buffer=True, + **kwargs): import pyarrow.dataset as ds # Raise error for not supported keywords @@ -1494,7 +1508,7 @@ def __init__(self, path_or_paths, filesystem=None, filters=None, "Dataset API".format(keyword)) # map format arguments - read_options = {} + read_options = {"pre_buffer": pre_buffer} if buffer_size: read_options.update(use_buffered_stream=True, buffer_size=buffer_size) @@ -1676,6 +1690,13 @@ def pieces(self): keys and only a hive-style directory structure is supported. When setting `use_legacy_dataset` to False, also within-file level filtering and different partitioning schemes are supported. +pre_buffer : bool, default True + Coalesce and issue file reads in parallel to improve performance on + high-latency filesystems (e.g. S3). If True, Arrow will use a + background I/O thread pool. This option is only supported for + use_legacy_dataset=False. If using a filesystem layer that itself + performs readahead (e.g. fsspec's S3FS), disable readahead for best + results. {3} @@ -1689,7 +1710,7 @@ def read_table(source, columns=None, use_threads=True, metadata=None, use_pandas_metadata=False, memory_map=False, read_dictionary=None, filesystem=None, filters=None, buffer_size=0, partitioning="hive", use_legacy_dataset=False, - ignore_prefixes=None): + ignore_prefixes=None, pre_buffer=True): if not use_legacy_dataset: if metadata is not None: raise ValueError( @@ -1708,6 +1729,7 @@ def read_table(source, columns=None, use_threads=True, metadata=None, buffer_size=buffer_size, filters=filters, ignore_prefixes=ignore_prefixes, + pre_buffer=pre_buffer, ) except ImportError: # fall back on ParquetFile for simple cases when pyarrow.dataset @@ -1728,7 +1750,8 @@ def read_table(source, columns=None, use_threads=True, metadata=None, # TODO test that source is not a directory or a list dataset = ParquetFile( source, metadata=metadata, read_dictionary=read_dictionary, - memory_map=memory_map, buffer_size=buffer_size) + memory_map=memory_map, buffer_size=buffer_size, + pre_buffer=pre_buffer) return dataset.read(columns=columns, use_threads=use_threads, use_pandas_metadata=use_pandas_metadata) diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index fce6ae58af7..70ea37b5af0 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -1021,6 +1021,27 @@ def test_dataset_enable_buffered_stream(tempdir, use_legacy_dataset): assert dataset.read().equals(table) +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_dataset_enable_pre_buffer(tempdir, use_legacy_dataset): + dirpath = tempdir / guid() + dirpath.mkdir() + + df = _test_dataframe(10, seed=0) + path = dirpath / '{}.parquet'.format(0) + table = pa.Table.from_pandas(df) + _write_table(table, path, version='2.0') + + for pre_buffer in (True, False): + dataset = pq.ParquetDataset( + dirpath, pre_buffer=pre_buffer, + use_legacy_dataset=use_legacy_dataset) + assert dataset.read().equals(table) + actual = pq.read_table(dirpath, pre_buffer=pre_buffer, + use_legacy_dataset=use_legacy_dataset) + assert actual.equals(table) + + def _make_example_multifile_dataset(base_path, nfiles=10, file_nrows=5): test_data = [] paths = [] diff --git a/python/pyarrow/tests/parquet/test_parquet_file.py b/python/pyarrow/tests/parquet/test_parquet_file.py index 85f81a3423e..dc9a3bb5274 100644 --- a/python/pyarrow/tests/parquet/test_parquet_file.py +++ b/python/pyarrow/tests/parquet/test_parquet_file.py @@ -256,3 +256,19 @@ def get_all_batches(f): ) batch_no += 1 + + +@pytest.mark.pandas +@pytest.mark.parametrize('pre_buffer', [False, True]) +def test_pre_buffer(pre_buffer): + N, K = 10000, 4 + df = alltypes_sample(size=N) + a_table = pa.Table.from_pandas(df) + + buf = io.BytesIO() + _write_table(a_table, buf, row_group_size=N / K, + compression='snappy', version='2.0') + + buf.seek(0) + pf = pq.ParquetFile(buf, pre_buffer=pre_buffer) + assert pf.read().num_rows == N diff --git a/python/setup.py b/python/setup.py index 24d54809a42..fac8e0b32e3 100755 --- a/python/setup.py +++ b/python/setup.py @@ -110,6 +110,7 @@ def run(self): ('with-flight', None, 'build the Flight extension'), ('with-dataset', None, 'build the Dataset extension'), ('with-parquet', None, 'build the Parquet extension'), + ('with-s3', None, 'build the Amazon S3 extension'), ('with-static-parquet', None, 'link parquet statically'), ('with-static-boost', None, 'link boost statically'), ('with-plasma', None, 'build the Plasma extension'),