Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 123 additions & 26 deletions python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import re
import operator
import urllib.parse
import warnings

import pyarrow as pa
import pyarrow.lib as lib
Expand Down Expand Up @@ -705,11 +706,17 @@ def _get_pandas_index_columns(keyvalues):

class ParquetDatasetPiece:
"""
A single chunk of a potentially larger Parquet dataset to read.
DEPRECATED: A single chunk of a potentially larger Parquet dataset to read.

The arguments will indicate to read either a single row group or all row
groups, and whether to add partition keys to the resulting pyarrow.Table.

.. deprecated:: 5.0
Directly constructing a ``ParquetDatasetPiece`` is deprecated, as well
as accessing the pieces of a ``ParquetDataset`` object. Specify
``use_legacy_dataset=False`` when constructing the ``ParquetDataset``
and use the ``ParquetDataset.fragments`` attribute instead.

Parameters
----------
path : str or pathlib.Path
Expand All @@ -724,6 +731,23 @@ class ParquetDatasetPiece:

def __init__(self, path, open_file_func=partial(open, mode='rb'),
file_options=None, row_group=None, partition_keys=None):
warnings.warn(
"ParquetDatasetPiece is deprecated as of pyarrow 5.0.0 and will "
"be removed in a future version.",
DeprecationWarning, stacklevel=2)
self._init(
path, open_file_func, file_options, row_group, partition_keys)

@staticmethod
def _create(path, open_file_func=partial(open, mode='rb'),
file_options=None, row_group=None, partition_keys=None):
self = ParquetDatasetPiece.__new__(ParquetDatasetPiece)
self._init(
path, open_file_func, file_options, row_group, partition_keys)
return self

def _init(self, path, open_file_func, file_options, row_group,
partition_keys):
self.path = _stringify_path(path)
self.open_file_func = open_file_func
self.row_group = row_group
Expand Down Expand Up @@ -1106,8 +1130,8 @@ def _parse_partition(self, dirname):

def _push_pieces(self, files, part_keys):
self.pieces.extend([
ParquetDatasetPiece(path, partition_keys=part_keys,
open_file_func=self.open_file_func)
ParquetDatasetPiece._create(path, partition_keys=part_keys,
open_file_func=self.open_file_func)
for path in files
])

Expand Down Expand Up @@ -1153,6 +1177,12 @@ def _open_dataset_file(dataset, path, meta=None):
)


_DEPR_MSG = (
"'{}' attribute is deprecated as of pyarrow 5.0.0 and will be removed "
"in a future version.{}"
)


_read_docstring_common = """\
read_dictionary : list, default None
List of names or column paths (for nested types) to read directly
Expand Down Expand Up @@ -1279,16 +1309,16 @@ def __init__(self, path_or_paths, filesystem=None, schema=None,
self._metadata.memory_map = memory_map
self._metadata.buffer_size = buffer_size

(self.pieces,
self.partitions,
(self._pieces,
self._partitions,
self.common_metadata_path,
self.metadata_path) = _make_manifest(
path_or_paths, self.fs, metadata_nthreads=metadata_nthreads,
path_or_paths, self._fs, metadata_nthreads=metadata_nthreads,
open_file_func=partial(_open_dataset_file, self._metadata)
)

if self.common_metadata_path is not None:
with self.fs.open(self.common_metadata_path) as f:
with self._fs.open(self.common_metadata_path) as f:
self._metadata.common_metadata = read_metadata(
f,
memory_map=memory_map
Expand All @@ -1297,7 +1327,7 @@ def __init__(self, path_or_paths, filesystem=None, schema=None,
self._metadata.common_metadata = None

if metadata is None and self.metadata_path is not None:
with self.fs.open(self.metadata_path) as f:
with self._fs.open(self.metadata_path) as f:
self.metadata = read_metadata(f, memory_map=memory_map)
else:
self.metadata = metadata
Expand All @@ -1320,14 +1350,17 @@ def equals(self, other):
if not isinstance(other, ParquetDataset):
raise TypeError('`other` must be an instance of ParquetDataset')

if self.fs.__class__ != other.fs.__class__:
if self._fs.__class__ != other._fs.__class__:
return False
for prop in ('paths', 'memory_map', 'pieces', 'partitions',
for prop in ('paths', '_pieces', '_partitions',
'common_metadata_path', 'metadata_path',
'common_metadata', 'metadata', 'schema',
'buffer_size', 'split_row_groups'):
'split_row_groups'):
if getattr(self, prop) != getattr(other, prop):
return False
for prop in ('memory_map', 'buffer_size'):
if getattr(self._metadata, prop) != getattr(other._metadata, prop):
return False

return True

Expand All @@ -1342,21 +1375,21 @@ def validate_schemas(self):
if self.common_metadata is not None:
self.schema = self.common_metadata.schema
else:
self.schema = self.pieces[0].get_metadata().schema
self.schema = self._pieces[0].get_metadata().schema
elif self.schema is None:
self.schema = self.metadata.schema

# Verify schemas are all compatible
dataset_schema = self.schema.to_arrow_schema()
# Exclude the partition columns from the schema, they are provided
# by the path, not the DatasetPiece
if self.partitions is not None:
for partition_name in self.partitions.partition_names:
if self._partitions is not None:
for partition_name in self._partitions.partition_names:
if dataset_schema.get_field_index(partition_name) != -1:
field_idx = dataset_schema.get_field_index(partition_name)
dataset_schema = dataset_schema.remove(field_idx)

for piece in self.pieces:
for piece in self._pieces:
file_metadata = piece.get_metadata()
file_schema = file_metadata.schema.to_arrow_schema()
if not dataset_schema.equals(file_schema, check_metadata=False):
Expand Down Expand Up @@ -1384,9 +1417,9 @@ def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
Content of the file as a table (of columns).
"""
tables = []
for piece in self.pieces:
for piece in self._pieces:
table = piece.read(columns=columns, use_threads=use_threads,
partitions=self.partitions,
partitions=self._partitions,
use_pandas_metadata=use_pandas_metadata)
tables.append(table)

Expand Down Expand Up @@ -1425,7 +1458,7 @@ def _get_common_pandas_metadata(self):
return keyvalues.get(b'pandas', None)

def _filter(self, filters):
accepts_filter = self.partitions.filter_accepts_partition
accepts_filter = self._partitions.filter_accepts_partition

def one_filter_accepts(piece, filter):
return all(accepts_filter(part_key, filter, level)
Expand All @@ -1435,17 +1468,65 @@ def all_filters_accept(piece):
return any(all(one_filter_accepts(piece, f) for f in conjunction)
for conjunction in filters)

self.pieces = [p for p in self.pieces if all_filters_accept(p)]
self._pieces = [p for p in self._pieces if all_filters_accept(p)]

@property
def pieces(self):
warnings.warn(
_DEPR_MSG.format(
"ParquetDataset.pieces",
" Specify 'use_legacy_dataset=False' while constructing the "
"ParquetDataset, and then use the '.fragments' attribute "
"instead."),
DeprecationWarning, stacklevel=2)
return self._pieces

@property
def partitions(self):
warnings.warn(
_DEPR_MSG.format("ParquetDataset.partitions", ""),
DeprecationWarning, stacklevel=2)
return self._partitions

@property
def memory_map(self):
warnings.warn(
_DEPR_MSG.format("ParquetDataset.memory_map", ""),
DeprecationWarning, stacklevel=2)
return self._metadata.memory_map

@property
def read_dictionary(self):
warnings.warn(
_DEPR_MSG.format("ParquetDataset.read_dictionary", ""),
DeprecationWarning, stacklevel=2)
return self._metadata.read_dictionary

fs = property(operator.attrgetter('_metadata.fs'))
memory_map = property(operator.attrgetter('_metadata.memory_map'))
read_dictionary = property(
operator.attrgetter('_metadata.read_dictionary')
@property
def buffer_size(self):
warnings.warn(
_DEPR_MSG.format("ParquetDataset.buffer_size", ""),
DeprecationWarning, stacklevel=2)
return self._metadata.buffer_size

_fs = property(
operator.attrgetter('_metadata.fs')
)

@property
def fs(self):
warnings.warn(
_DEPR_MSG.format(
"ParquetDataset.fs",
" Specify 'use_legacy_dataset=False' while constructing the "
"ParquetDataset, and then use the '.filesystem' attribute "
"instead."),
DeprecationWarning, stacklevel=2)
return self._metadata.fs

common_metadata = property(
operator.attrgetter('_metadata.common_metadata')
)
buffer_size = property(operator.attrgetter('_metadata.buffer_size'))


def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1,
Expand Down Expand Up @@ -1480,7 +1561,8 @@ def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1,
if not fs.isfile(path):
raise OSError('Passed non-file path: {}'
.format(path))
piece = ParquetDatasetPiece(path, open_file_func=open_file_func)
piece = ParquetDatasetPiece._create(
path, open_file_func=open_file_func)
pieces.append(piece)

return pieces, partitions, common_metadata_path, metadata_path
Expand Down Expand Up @@ -1663,9 +1745,24 @@ def read_pandas(self, **kwargs):

@property
def pieces(self):
# TODO raise deprecation warning
warnings.warn(
_DEPR_MSG.format("ParquetDataset.pieces",
" Use the '.fragments' attribute instead"),
DeprecationWarning, stacklevel=2)
return list(self._dataset.get_fragments())

@property
def fragments(self):
return list(self._dataset.get_fragments())

@property
def files(self):
return self._dataset.files

@property
def filesystem(self):
return self._dataset.filesystem


_read_table_docstring = """
{0}
Expand Down
45 changes: 41 additions & 4 deletions python/pyarrow/tests/parquet/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def test_parquet_piece_read(tempdir):
path = tempdir / 'parquet_piece_read.parquet'
_write_table(table, path, version='2.0')

piece1 = pq.ParquetDatasetPiece(path)
with pytest.warns(DeprecationWarning):
piece1 = pq.ParquetDatasetPiece(path)

result = piece1.read()
assert result.equals(table)
Expand All @@ -71,7 +72,8 @@ def test_parquet_piece_open_and_get_metadata(tempdir):
path = tempdir / 'parquet_piece_read.parquet'
_write_table(table, path, version='2.0')

piece = pq.ParquetDatasetPiece(path)
with pytest.warns(DeprecationWarning):
piece = pq.ParquetDatasetPiece(path)
table1 = piece.read()
assert isinstance(table1, pa.Table)
meta1 = piece.get_metadata()
Expand All @@ -80,6 +82,7 @@ def test_parquet_piece_open_and_get_metadata(tempdir):
assert table.equals(table1)


@pytest.mark.filterwarnings("ignore:ParquetDatasetPiece:DeprecationWarning")
def test_parquet_piece_basics():
path = '/baz.parq'

Expand Down Expand Up @@ -139,6 +142,7 @@ def test_read_partitioned_directory(tempdir, use_legacy_dataset):
_partition_test_for_filesystem(fs, tempdir, use_legacy_dataset)


@pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning")
@pytest.mark.pandas
def test_create_parquet_dataset_multi_threaded(tempdir):
fs = LocalFileSystem._get_instance()
Expand Down Expand Up @@ -979,6 +983,7 @@ def test_dataset_read_pandas(tempdir, use_legacy_dataset):
tm.assert_frame_equal(result.reindex(columns=expected.columns), expected)


@pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning")
@pytest.mark.pandas
@parametrize_legacy_dataset
def test_dataset_memory_map(tempdir, use_legacy_dataset):
Expand Down Expand Up @@ -1056,7 +1061,7 @@ def _make_example_multifile_dataset(base_path, nfiles=10, file_nrows=5):

def _assert_dataset_paths(dataset, paths, use_legacy_dataset):
if use_legacy_dataset:
assert set(map(str, paths)) == {x.path for x in dataset.pieces}
assert set(map(str, paths)) == {x.path for x in dataset._pieces}
else:
paths = [str(path.as_posix()) for path in paths]
assert set(paths) == set(dataset._dataset.files)
Expand Down Expand Up @@ -1368,6 +1373,7 @@ def test_write_to_dataset_no_partitions_s3fs(
path, use_legacy_dataset, filesystem=fs)


@pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning")
@pytest.mark.pandas
@parametrize_legacy_dataset_not_supported
def test_write_to_dataset_with_partitions_and_custom_filenames(
Expand Down Expand Up @@ -1456,7 +1462,7 @@ def is_pickleable(obj):
for column in dataset.metadata.schema:
assert is_pickleable(column)

for piece in dataset.pieces:
for piece in dataset._pieces:
assert is_pickleable(piece)
metadata = piece.get_metadata()
assert metadata.num_row_groups
Expand Down Expand Up @@ -1594,6 +1600,7 @@ def test_parquet_dataset_new_filesystem(tempdir):
assert result.equals(table)


@pytest.mark.filterwarnings("ignore:'ParquetDataset:DeprecationWarning")
def test_parquet_dataset_partitions_piece_path_with_fsspec(tempdir):
# ARROW-10462 ensure that on Windows we properly use posix-style paths
# as used by fsspec
Expand All @@ -1608,3 +1615,33 @@ def test_parquet_dataset_partitions_piece_path_with_fsspec(tempdir):
# ensure the piece path is also posix-style
expected = path + "/data.parquet"
assert dataset.pieces[0].path == expected


def test_parquet_dataset_deprecated_properties(tempdir):
table = pa.table({'a': [1, 2, 3]})
path = tempdir / 'data.parquet'
pq.write_table(table, path)
dataset = pq.ParquetDataset(path)

with pytest.warns(DeprecationWarning, match="'ParquetDataset.pieces"):
dataset.pieces

with pytest.warns(DeprecationWarning, match="'ParquetDataset.partitions"):
dataset.partitions

with pytest.warns(DeprecationWarning, match="'ParquetDataset.memory_map"):
dataset.memory_map

with pytest.warns(DeprecationWarning, match="'ParquetDataset.read_dictio"):
dataset.read_dictionary

with pytest.warns(DeprecationWarning, match="'ParquetDataset.buffer_size"):
dataset.buffer_size

with pytest.warns(DeprecationWarning, match="'ParquetDataset.fs"):
dataset.fs

dataset2 = pq.ParquetDataset(path, use_legacy_dataset=False)

with pytest.warns(DeprecationWarning, match="'ParquetDataset.pieces"):
dataset2.pieces
Loading