diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 22763680cd1..5932578dc24 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -27,6 +27,7 @@ import re import operator import urllib.parse +import warnings import pyarrow as pa import pyarrow.lib as lib @@ -705,11 +706,17 @@ def _get_pandas_index_columns(keyvalues): class ParquetDatasetPiece: """ - A single chunk of a potentially larger Parquet dataset to read. + DEPRECATED: A single chunk of a potentially larger Parquet dataset to read. The arguments will indicate to read either a single row group or all row groups, and whether to add partition keys to the resulting pyarrow.Table. + .. deprecated:: 5.0 + Directly constructing a ``ParquetDatasetPiece`` is deprecated, as well + as accessing the pieces of a ``ParquetDataset`` object. Specify + ``use_legacy_dataset=False`` when constructing the ``ParquetDataset`` + and use the ``ParquetDataset.fragments`` attribute instead. + Parameters ---------- path : str or pathlib.Path @@ -724,6 +731,23 @@ class ParquetDatasetPiece: def __init__(self, path, open_file_func=partial(open, mode='rb'), file_options=None, row_group=None, partition_keys=None): + warnings.warn( + "ParquetDatasetPiece is deprecated as of pyarrow 5.0.0 and will " + "be removed in a future version.", + DeprecationWarning, stacklevel=2) + self._init( + path, open_file_func, file_options, row_group, partition_keys) + + @staticmethod + def _create(path, open_file_func=partial(open, mode='rb'), + file_options=None, row_group=None, partition_keys=None): + self = ParquetDatasetPiece.__new__(ParquetDatasetPiece) + self._init( + path, open_file_func, file_options, row_group, partition_keys) + return self + + def _init(self, path, open_file_func, file_options, row_group, + partition_keys): self.path = _stringify_path(path) self.open_file_func = open_file_func self.row_group = row_group @@ -1106,8 +1130,8 @@ def _parse_partition(self, dirname): def _push_pieces(self, files, part_keys): self.pieces.extend([ - ParquetDatasetPiece(path, partition_keys=part_keys, - open_file_func=self.open_file_func) + ParquetDatasetPiece._create(path, partition_keys=part_keys, + open_file_func=self.open_file_func) for path in files ]) @@ -1153,6 +1177,12 @@ def _open_dataset_file(dataset, path, meta=None): ) +_DEPR_MSG = ( + "'{}' attribute is deprecated as of pyarrow 5.0.0 and will be removed " + "in a future version.{}" +) + + _read_docstring_common = """\ read_dictionary : list, default None List of names or column paths (for nested types) to read directly @@ -1279,16 +1309,16 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, self._metadata.memory_map = memory_map self._metadata.buffer_size = buffer_size - (self.pieces, - self.partitions, + (self._pieces, + self._partitions, self.common_metadata_path, self.metadata_path) = _make_manifest( - path_or_paths, self.fs, metadata_nthreads=metadata_nthreads, + path_or_paths, self._fs, metadata_nthreads=metadata_nthreads, open_file_func=partial(_open_dataset_file, self._metadata) ) if self.common_metadata_path is not None: - with self.fs.open(self.common_metadata_path) as f: + with self._fs.open(self.common_metadata_path) as f: self._metadata.common_metadata = read_metadata( f, memory_map=memory_map @@ -1297,7 +1327,7 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, self._metadata.common_metadata = None if metadata is None and self.metadata_path is not None: - with self.fs.open(self.metadata_path) as f: + with self._fs.open(self.metadata_path) as f: self.metadata = read_metadata(f, memory_map=memory_map) else: self.metadata = metadata @@ -1320,14 +1350,17 @@ def equals(self, other): if not isinstance(other, ParquetDataset): raise TypeError('`other` must be an instance of ParquetDataset') - if self.fs.__class__ != other.fs.__class__: + if self._fs.__class__ != other._fs.__class__: return False - for prop in ('paths', 'memory_map', 'pieces', 'partitions', + for prop in ('paths', '_pieces', '_partitions', 'common_metadata_path', 'metadata_path', 'common_metadata', 'metadata', 'schema', - 'buffer_size', 'split_row_groups'): + 'split_row_groups'): if getattr(self, prop) != getattr(other, prop): return False + for prop in ('memory_map', 'buffer_size'): + if getattr(self._metadata, prop) != getattr(other._metadata, prop): + return False return True @@ -1342,7 +1375,7 @@ def validate_schemas(self): if self.common_metadata is not None: self.schema = self.common_metadata.schema else: - self.schema = self.pieces[0].get_metadata().schema + self.schema = self._pieces[0].get_metadata().schema elif self.schema is None: self.schema = self.metadata.schema @@ -1350,13 +1383,13 @@ def validate_schemas(self): dataset_schema = self.schema.to_arrow_schema() # Exclude the partition columns from the schema, they are provided # by the path, not the DatasetPiece - if self.partitions is not None: - for partition_name in self.partitions.partition_names: + if self._partitions is not None: + for partition_name in self._partitions.partition_names: if dataset_schema.get_field_index(partition_name) != -1: field_idx = dataset_schema.get_field_index(partition_name) dataset_schema = dataset_schema.remove(field_idx) - for piece in self.pieces: + for piece in self._pieces: file_metadata = piece.get_metadata() file_schema = file_metadata.schema.to_arrow_schema() if not dataset_schema.equals(file_schema, check_metadata=False): @@ -1384,9 +1417,9 @@ def read(self, columns=None, use_threads=True, use_pandas_metadata=False): Content of the file as a table (of columns). """ tables = [] - for piece in self.pieces: + for piece in self._pieces: table = piece.read(columns=columns, use_threads=use_threads, - partitions=self.partitions, + partitions=self._partitions, use_pandas_metadata=use_pandas_metadata) tables.append(table) @@ -1425,7 +1458,7 @@ def _get_common_pandas_metadata(self): return keyvalues.get(b'pandas', None) def _filter(self, filters): - accepts_filter = self.partitions.filter_accepts_partition + accepts_filter = self._partitions.filter_accepts_partition def one_filter_accepts(piece, filter): return all(accepts_filter(part_key, filter, level) @@ -1435,17 +1468,65 @@ def all_filters_accept(piece): return any(all(one_filter_accepts(piece, f) for f in conjunction) for conjunction in filters) - self.pieces = [p for p in self.pieces if all_filters_accept(p)] + self._pieces = [p for p in self._pieces if all_filters_accept(p)] + + @property + def pieces(self): + warnings.warn( + _DEPR_MSG.format( + "ParquetDataset.pieces", + " Specify 'use_legacy_dataset=False' while constructing the " + "ParquetDataset, and then use the '.fragments' attribute " + "instead."), + DeprecationWarning, stacklevel=2) + return self._pieces + + @property + def partitions(self): + warnings.warn( + _DEPR_MSG.format("ParquetDataset.partitions", ""), + DeprecationWarning, stacklevel=2) + return self._partitions + + @property + def memory_map(self): + warnings.warn( + _DEPR_MSG.format("ParquetDataset.memory_map", ""), + DeprecationWarning, stacklevel=2) + return self._metadata.memory_map + + @property + def read_dictionary(self): + warnings.warn( + _DEPR_MSG.format("ParquetDataset.read_dictionary", ""), + DeprecationWarning, stacklevel=2) + return self._metadata.read_dictionary - fs = property(operator.attrgetter('_metadata.fs')) - memory_map = property(operator.attrgetter('_metadata.memory_map')) - read_dictionary = property( - operator.attrgetter('_metadata.read_dictionary') + @property + def buffer_size(self): + warnings.warn( + _DEPR_MSG.format("ParquetDataset.buffer_size", ""), + DeprecationWarning, stacklevel=2) + return self._metadata.buffer_size + + _fs = property( + operator.attrgetter('_metadata.fs') ) + + @property + def fs(self): + warnings.warn( + _DEPR_MSG.format( + "ParquetDataset.fs", + " Specify 'use_legacy_dataset=False' while constructing the " + "ParquetDataset, and then use the '.filesystem' attribute " + "instead."), + DeprecationWarning, stacklevel=2) + return self._metadata.fs + common_metadata = property( operator.attrgetter('_metadata.common_metadata') ) - buffer_size = property(operator.attrgetter('_metadata.buffer_size')) def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1, @@ -1480,7 +1561,8 @@ def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1, if not fs.isfile(path): raise OSError('Passed non-file path: {}' .format(path)) - piece = ParquetDatasetPiece(path, open_file_func=open_file_func) + piece = ParquetDatasetPiece._create( + path, open_file_func=open_file_func) pieces.append(piece) return pieces, partitions, common_metadata_path, metadata_path @@ -1663,9 +1745,24 @@ def read_pandas(self, **kwargs): @property def pieces(self): - # TODO raise deprecation warning + warnings.warn( + _DEPR_MSG.format("ParquetDataset.pieces", + " Use the '.fragments' attribute instead"), + DeprecationWarning, stacklevel=2) return list(self._dataset.get_fragments()) + @property + def fragments(self): + return list(self._dataset.get_fragments()) + + @property + def files(self): + return self._dataset.files + + @property + def filesystem(self): + return self._dataset.filesystem + _read_table_docstring = """ {0} diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index 8cff6954cf2..81e3cdd7468 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -57,7 +57,8 @@ def test_parquet_piece_read(tempdir): path = tempdir / 'parquet_piece_read.parquet' _write_table(table, path, version='2.0') - piece1 = pq.ParquetDatasetPiece(path) + with pytest.warns(DeprecationWarning): + piece1 = pq.ParquetDatasetPiece(path) result = piece1.read() assert result.equals(table) @@ -71,7 +72,8 @@ def test_parquet_piece_open_and_get_metadata(tempdir): path = tempdir / 'parquet_piece_read.parquet' _write_table(table, path, version='2.0') - piece = pq.ParquetDatasetPiece(path) + with pytest.warns(DeprecationWarning): + piece = pq.ParquetDatasetPiece(path) table1 = piece.read() assert isinstance(table1, pa.Table) meta1 = piece.get_metadata() @@ -80,6 +82,7 @@ def test_parquet_piece_open_and_get_metadata(tempdir): assert table.equals(table1) +@pytest.mark.filterwarnings("ignore:ParquetDatasetPiece:DeprecationWarning") def test_parquet_piece_basics(): path = '/baz.parq' @@ -139,6 +142,7 @@ def test_read_partitioned_directory(tempdir, use_legacy_dataset): _partition_test_for_filesystem(fs, tempdir, use_legacy_dataset) +@pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning") @pytest.mark.pandas def test_create_parquet_dataset_multi_threaded(tempdir): fs = LocalFileSystem._get_instance() @@ -979,6 +983,7 @@ def test_dataset_read_pandas(tempdir, use_legacy_dataset): tm.assert_frame_equal(result.reindex(columns=expected.columns), expected) +@pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning") @pytest.mark.pandas @parametrize_legacy_dataset def test_dataset_memory_map(tempdir, use_legacy_dataset): @@ -1056,7 +1061,7 @@ def _make_example_multifile_dataset(base_path, nfiles=10, file_nrows=5): def _assert_dataset_paths(dataset, paths, use_legacy_dataset): if use_legacy_dataset: - assert set(map(str, paths)) == {x.path for x in dataset.pieces} + assert set(map(str, paths)) == {x.path for x in dataset._pieces} else: paths = [str(path.as_posix()) for path in paths] assert set(paths) == set(dataset._dataset.files) @@ -1368,6 +1373,7 @@ def test_write_to_dataset_no_partitions_s3fs( path, use_legacy_dataset, filesystem=fs) +@pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning") @pytest.mark.pandas @parametrize_legacy_dataset_not_supported def test_write_to_dataset_with_partitions_and_custom_filenames( @@ -1456,7 +1462,7 @@ def is_pickleable(obj): for column in dataset.metadata.schema: assert is_pickleable(column) - for piece in dataset.pieces: + for piece in dataset._pieces: assert is_pickleable(piece) metadata = piece.get_metadata() assert metadata.num_row_groups @@ -1594,6 +1600,7 @@ def test_parquet_dataset_new_filesystem(tempdir): assert result.equals(table) +@pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning") def test_parquet_dataset_partitions_piece_path_with_fsspec(tempdir): # ARROW-10462 ensure that on Windows we properly use posix-style paths # as used by fsspec @@ -1608,3 +1615,33 @@ def test_parquet_dataset_partitions_piece_path_with_fsspec(tempdir): # ensure the piece path is also posix-style expected = path + "/data.parquet" assert dataset.pieces[0].path == expected + + +def test_parquet_dataset_deprecated_properties(tempdir): + table = pa.table({'a': [1, 2, 3]}) + path = tempdir / 'data.parquet' + pq.write_table(table, path) + dataset = pq.ParquetDataset(path) + + with pytest.warns(DeprecationWarning, match="'ParquetDataset.pieces"): + dataset.pieces + + with pytest.warns(DeprecationWarning, match="'ParquetDataset.partitions"): + dataset.partitions + + with pytest.warns(DeprecationWarning, match="'ParquetDataset.memory_map"): + dataset.memory_map + + with pytest.warns(DeprecationWarning, match="'ParquetDataset.read_dictio"): + dataset.read_dictionary + + with pytest.warns(DeprecationWarning, match="'ParquetDataset.buffer_size"): + dataset.buffer_size + + with pytest.warns(DeprecationWarning, match="'ParquetDataset.fs"): + dataset.fs + + dataset2 = pq.ParquetDataset(path, use_legacy_dataset=False) + + with pytest.warns(DeprecationWarning, match="'ParquetDataset.pieces"): + dataset2.pieces diff --git a/python/pyarrow/tests/parquet/test_metadata.py b/python/pyarrow/tests/parquet/test_metadata.py index 4c310661fe9..3ba8a467c40 100644 --- a/python/pyarrow/tests/parquet/test_metadata.py +++ b/python/pyarrow/tests/parquet/test_metadata.py @@ -138,8 +138,8 @@ def test_parquet_metadata_lifetime(tempdir): # ARROW-6642 - ensure that chained access keeps parent objects alive table = pa.table({'a': [1, 2, 3]}) pq.write_table(table, tempdir / 'test_metadata_segfault.parquet') - dataset = pq.ParquetDataset(tempdir / 'test_metadata_segfault.parquet') - dataset.pieces[0].get_metadata().row_group(0).column(0).statistics + parquet_file = pq.ParquetFile(tempdir / 'test_metadata_segfault.parquet') + parquet_file.metadata.row_group(0).column(0).statistics @pytest.mark.pandas