From 556e3ca5cd74e6117d2cfaea2ef9ef62d2e170da Mon Sep 17 00:00:00 2001 From: David Li Date: Fri, 16 Apr 2021 16:01:15 -0400 Subject: [PATCH 1/4] ARROW-12428: [Python] Expose pre_buffer in pyarrow.parquet --- python/pyarrow/_parquet.pyx | 4 ++- python/pyarrow/parquet.py | 36 ++++++++++++++----- python/pyarrow/tests/parquet/test_dataset.py | 21 +++++++++++ .../tests/parquet/test_parquet_file.py | 16 +++++++++ python/setup.py | 1 + 5 files changed, 69 insertions(+), 9 deletions(-) diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 4b435ba1d1c..0b66ea0e0b0 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -929,7 +929,7 @@ cdef class ParquetReader(_Weakrefable): def open(self, object source not None, bint use_memory_map=True, read_dictionary=None, FileMetaData metadata=None, - int buffer_size=0): + int buffer_size=0, bint pre_buffer=False): cdef: shared_ptr[CRandomAccessFile] rd_handle shared_ptr[CFileMetaData] c_metadata @@ -950,6 +950,8 @@ cdef class ParquetReader(_Weakrefable): else: raise ValueError('Buffer size must be larger than zero') + arrow_props.set_pre_buffer(pre_buffer) + self.source = source get_reader(source, use_memory_map, &rd_handle) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 88683d95013..93fe86ffa4c 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -209,13 +209,17 @@ class ParquetFile: buffer_size : int, default 0 If positive, perform read buffering when deserializing individual column chunks. Otherwise IO calls are unbuffered. + pre_buffer : bool, default False + Coalesce and issue file reads in parallel to improve performance on + high-latency filesystems (e.g. S3). """ def __init__(self, source, metadata=None, common_metadata=None, - read_dictionary=None, memory_map=False, buffer_size=0): + read_dictionary=None, memory_map=False, buffer_size=0, + pre_buffer=False): self.reader = ParquetReader() self.reader.open(source, use_memory_map=memory_map, - buffer_size=buffer_size, + buffer_size=buffer_size, pre_buffer=pre_buffer, read_dictionary=read_dictionary, metadata=metadata) self.common_metadata = common_metadata self._nested_paths_by_prefix = self._build_nested_paths() @@ -1212,13 +1216,19 @@ class ParquetDataset: new Arrow Dataset API). Among other things, this allows to pass `filters` for all columns and not only the partition keys, enables different partitioning schemes, etc. +pre_buffer : bool, default True + Coalesce and issue file reads in parallel to improve performance on + high-latency filesystems (e.g. S3). This option is only supported for + use_legacy_dataset=True. If using a filesystem layer that itself + performs readahead (e.g. fsspec's S3FS), disable readahead for best + results. """.format(_read_docstring_common, _DNF_filter_doc) def __new__(cls, path_or_paths=None, filesystem=None, schema=None, metadata=None, split_row_groups=False, validate_schema=True, filters=None, metadata_nthreads=1, read_dictionary=None, memory_map=False, buffer_size=0, partitioning="hive", - use_legacy_dataset=None): + use_legacy_dataset=None, pre_buffer=True): if use_legacy_dataset is None: # if a new filesystem is passed -> default to new implementation if isinstance(filesystem, FileSystem): @@ -1234,6 +1244,7 @@ def __new__(cls, path_or_paths=None, filesystem=None, schema=None, read_dictionary=read_dictionary, memory_map=memory_map, buffer_size=buffer_size, + pre_buffer=pre_buffer, # unsupported keywords schema=schema, metadata=metadata, split_row_groups=split_row_groups, @@ -1246,7 +1257,7 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, metadata=None, split_row_groups=False, validate_schema=True, filters=None, metadata_nthreads=1, read_dictionary=None, memory_map=False, buffer_size=0, partitioning="hive", - use_legacy_dataset=True): + use_legacy_dataset=True, pre_buffer=True): if partitioning != "hive": raise ValueError( 'Only "hive" for hive-like partitioning is supported when ' @@ -1480,7 +1491,8 @@ class _ParquetDatasetV2: def __init__(self, path_or_paths, filesystem=None, filters=None, partitioning="hive", read_dictionary=None, buffer_size=None, - memory_map=False, ignore_prefixes=None, **kwargs): + memory_map=False, ignore_prefixes=None, pre_buffer=True, + **kwargs): import pyarrow.dataset as ds # Raise error for not supported keywords @@ -1494,7 +1506,7 @@ def __init__(self, path_or_paths, filesystem=None, filters=None, "Dataset API".format(keyword)) # map format arguments - read_options = {} + read_options = {"pre_buffer": pre_buffer} if buffer_size: read_options.update(use_buffered_stream=True, buffer_size=buffer_size) @@ -1676,6 +1688,12 @@ def pieces(self): keys and only a hive-style directory structure is supported. When setting `use_legacy_dataset` to False, also within-file level filtering and different partitioning schemes are supported. +pre_buffer : bool, default True + Coalesce and issue file reads in parallel to improve performance on + high-latency filesystems (e.g. S3). This option is only supported for + use_legacy_dataset=True. If using a filesystem layer that itself + performs readahead (e.g. fsspec's S3FS), disable readahead for best + results. {3} @@ -1689,7 +1707,7 @@ def read_table(source, columns=None, use_threads=True, metadata=None, use_pandas_metadata=False, memory_map=False, read_dictionary=None, filesystem=None, filters=None, buffer_size=0, partitioning="hive", use_legacy_dataset=False, - ignore_prefixes=None): + ignore_prefixes=None, pre_buffer=True): if not use_legacy_dataset: if metadata is not None: raise ValueError( @@ -1708,6 +1726,7 @@ def read_table(source, columns=None, use_threads=True, metadata=None, buffer_size=buffer_size, filters=filters, ignore_prefixes=ignore_prefixes, + pre_buffer=pre_buffer, ) except ImportError: # fall back on ParquetFile for simple cases when pyarrow.dataset @@ -1728,7 +1747,8 @@ def read_table(source, columns=None, use_threads=True, metadata=None, # TODO test that source is not a directory or a list dataset = ParquetFile( source, metadata=metadata, read_dictionary=read_dictionary, - memory_map=memory_map, buffer_size=buffer_size) + memory_map=memory_map, buffer_size=buffer_size, + pre_buffer=pre_buffer) return dataset.read(columns=columns, use_threads=use_threads, use_pandas_metadata=use_pandas_metadata) diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index fce6ae58af7..70ea37b5af0 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -1021,6 +1021,27 @@ def test_dataset_enable_buffered_stream(tempdir, use_legacy_dataset): assert dataset.read().equals(table) +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_dataset_enable_pre_buffer(tempdir, use_legacy_dataset): + dirpath = tempdir / guid() + dirpath.mkdir() + + df = _test_dataframe(10, seed=0) + path = dirpath / '{}.parquet'.format(0) + table = pa.Table.from_pandas(df) + _write_table(table, path, version='2.0') + + for pre_buffer in (True, False): + dataset = pq.ParquetDataset( + dirpath, pre_buffer=pre_buffer, + use_legacy_dataset=use_legacy_dataset) + assert dataset.read().equals(table) + actual = pq.read_table(dirpath, pre_buffer=pre_buffer, + use_legacy_dataset=use_legacy_dataset) + assert actual.equals(table) + + def _make_example_multifile_dataset(base_path, nfiles=10, file_nrows=5): test_data = [] paths = [] diff --git a/python/pyarrow/tests/parquet/test_parquet_file.py b/python/pyarrow/tests/parquet/test_parquet_file.py index 85f81a3423e..dc9a3bb5274 100644 --- a/python/pyarrow/tests/parquet/test_parquet_file.py +++ b/python/pyarrow/tests/parquet/test_parquet_file.py @@ -256,3 +256,19 @@ def get_all_batches(f): ) batch_no += 1 + + +@pytest.mark.pandas +@pytest.mark.parametrize('pre_buffer', [False, True]) +def test_pre_buffer(pre_buffer): + N, K = 10000, 4 + df = alltypes_sample(size=N) + a_table = pa.Table.from_pandas(df) + + buf = io.BytesIO() + _write_table(a_table, buf, row_group_size=N / K, + compression='snappy', version='2.0') + + buf.seek(0) + pf = pq.ParquetFile(buf, pre_buffer=pre_buffer) + assert pf.read().num_rows == N diff --git a/python/setup.py b/python/setup.py index 24d54809a42..fac8e0b32e3 100755 --- a/python/setup.py +++ b/python/setup.py @@ -110,6 +110,7 @@ def run(self): ('with-flight', None, 'build the Flight extension'), ('with-dataset', None, 'build the Dataset extension'), ('with-parquet', None, 'build the Parquet extension'), + ('with-s3', None, 'build the Amazon S3 extension'), ('with-static-parquet', None, 'link parquet statically'), ('with-static-boost', None, 'link boost statically'), ('with-plasma', None, 'build the Plasma extension'), From f0c8c613375b00c012793bd7b9976205af742a60 Mon Sep 17 00:00:00 2001 From: David Li Date: Mon, 19 Apr 2021 08:44:07 -0400 Subject: [PATCH 2/4] ARROW-12428: [Python] Tweak pre_buffer docstring --- python/pyarrow/parquet.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 93fe86ffa4c..d21c86813aa 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -211,7 +211,8 @@ class ParquetFile: column chunks. Otherwise IO calls are unbuffered. pre_buffer : bool, default False Coalesce and issue file reads in parallel to improve performance on - high-latency filesystems (e.g. S3). + high-latency filesystems (e.g. S3). If True, Arrow will use a + background I/O thread pool. """ def __init__(self, source, metadata=None, common_metadata=None, @@ -1218,7 +1219,8 @@ class ParquetDataset: different partitioning schemes, etc. pre_buffer : bool, default True Coalesce and issue file reads in parallel to improve performance on - high-latency filesystems (e.g. S3). This option is only supported for + high-latency filesystems (e.g. S3). If True, Arrow will use a + background I/O thread pool. This option is only supported for use_legacy_dataset=True. If using a filesystem layer that itself performs readahead (e.g. fsspec's S3FS), disable readahead for best results. @@ -1690,7 +1692,8 @@ def pieces(self): and different partitioning schemes are supported. pre_buffer : bool, default True Coalesce and issue file reads in parallel to improve performance on - high-latency filesystems (e.g. S3). This option is only supported for + high-latency filesystems (e.g. S3). If True, Arrow will use a + background I/O thread pool. This option is only supported for use_legacy_dataset=True. If using a filesystem layer that itself performs readahead (e.g. fsspec's S3FS), disable readahead for best results. From 6eced0943013af463ae70c18e760603206f5502c Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 4 May 2021 13:39:56 -0400 Subject: [PATCH 3/4] Update python/pyarrow/parquet.py Co-authored-by: Joris Van den Bossche --- python/pyarrow/parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index d21c86813aa..2f97d5d7b7c 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -1221,7 +1221,7 @@ class ParquetDataset: Coalesce and issue file reads in parallel to improve performance on high-latency filesystems (e.g. S3). If True, Arrow will use a background I/O thread pool. This option is only supported for - use_legacy_dataset=True. If using a filesystem layer that itself + use_legacy_dataset=False. If using a filesystem layer that itself performs readahead (e.g. fsspec's S3FS), disable readahead for best results. """.format(_read_docstring_common, _DNF_filter_doc) From cdc3029be97a596f3a3b2f942df35fb858dc3af0 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 4 May 2021 19:42:49 +0200 Subject: [PATCH 4/4] Update python/pyarrow/parquet.py --- python/pyarrow/parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 2f97d5d7b7c..97e431c4179 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -1694,7 +1694,7 @@ def pieces(self): Coalesce and issue file reads in parallel to improve performance on high-latency filesystems (e.g. S3). If True, Arrow will use a background I/O thread pool. This option is only supported for - use_legacy_dataset=True. If using a filesystem layer that itself + use_legacy_dataset=False. If using a filesystem layer that itself performs readahead (e.g. fsspec's S3FS), disable readahead for best results.