Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 60 additions & 17 deletions python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1368,7 +1368,7 @@ class ParquetDataset:

def __new__(cls, path_or_paths=None, filesystem=None, schema=None,
metadata=None, split_row_groups=False, validate_schema=True,
filters=None, metadata_nthreads=1, read_dictionary=None,
filters=None, metadata_nthreads=None, read_dictionary=None,
memory_map=False, buffer_size=0, partitioning="hive",
use_legacy_dataset=None, pre_buffer=True,
coerce_int96_timestamp_unit=None):
Expand Down Expand Up @@ -1401,14 +1401,24 @@ def __new__(cls, path_or_paths=None, filesystem=None, schema=None,

def __init__(self, path_or_paths, filesystem=None, schema=None,
metadata=None, split_row_groups=False, validate_schema=True,
filters=None, metadata_nthreads=1, read_dictionary=None,
filters=None, metadata_nthreads=None, read_dictionary=None,
memory_map=False, buffer_size=0, partitioning="hive",
use_legacy_dataset=True, pre_buffer=True,
coerce_int96_timestamp_unit=None):
if partitioning != "hive":
raise ValueError(
'Only "hive" for hive-like partitioning is supported when '
'using use_legacy_dataset=True')
if metadata_nthreads is not None:
warnings.warn(
"Specifying the 'metadata_nthreads' argument is deprecated as "
"of pyarrow 8.0.0, and the argument will be removed in a "
"future version",
DeprecationWarning, stacklevel=2,
)
else:
metadata_nthreads = 1

self._metadata = _ParquetDatasetMetadata()
a_path = path_or_paths
if isinstance(a_path, list):
Expand Down Expand Up @@ -1447,7 +1457,15 @@ def __init__(self, path_or_paths, filesystem=None, schema=None,
else:
self.metadata = metadata

self.schema = schema
if schema is not None:
warnings.warn(
"Specifying the 'schema' argument with 'use_legacy_dataset="
"True' is deprecated as of pyarrow 8.0.0. You can still "
"specify it in combination with 'use_legacy_dataet=False', "
"but in that case you need to specify a pyarrow.Schema "
"instead of a ParquetSchema.",
DeprecationWarning, stacklevel=2)
self._schema = schema

self.split_row_groups = split_row_groups

Expand All @@ -1469,7 +1487,7 @@ def equals(self, other):
return False
for prop in ('paths', '_pieces', '_partitions',
'common_metadata_path', 'metadata_path',
'common_metadata', 'metadata', 'schema',
'common_metadata', 'metadata', '_schema',
'split_row_groups'):
if getattr(self, prop) != getattr(other, prop):
return False
Expand All @@ -1486,16 +1504,16 @@ def __eq__(self, other):
return NotImplemented

def validate_schemas(self):
if self.metadata is None and self.schema is None:
if self.metadata is None and self._schema is None:
if self.common_metadata is not None:
self.schema = self.common_metadata.schema
self._schema = self.common_metadata.schema
else:
self.schema = self._pieces[0].get_metadata().schema
elif self.schema is None:
self.schema = self.metadata.schema
self._schema = self._pieces[0].get_metadata().schema
elif self._schema is None:
self._schema = self.metadata.schema

# Verify schemas are all compatible
dataset_schema = self.schema.to_arrow_schema()
dataset_schema = self._schema.to_arrow_schema()
# Exclude the partition columns from the schema, they are provided
# by the path, not the DatasetPiece
if self._partitions is not None:
Expand Down Expand Up @@ -1612,6 +1630,18 @@ def partitions(self):
DeprecationWarning, stacklevel=2)
return self._partitions

@property
def schema(self):
warnings.warn(
_DEPR_MSG.format(
"ParquetDataset.schema",
" Specify 'use_legacy_dataset=False' while constructing the "
"ParquetDataset, and then use the '.schema' attribute "
"instead (which will return an Arrow schema instead of a "
"Parquet schema)."),
DeprecationWarning, stacklevel=2)
return self._schema

@property
def memory_map(self):
warnings.warn(
Expand Down Expand Up @@ -1706,15 +1736,14 @@ class _ParquetDatasetV2:
def __init__(self, path_or_paths, filesystem=None, filters=None,
partitioning="hive", read_dictionary=None, buffer_size=None,
memory_map=False, ignore_prefixes=None, pre_buffer=True,
coerce_int96_timestamp_unit=None,
coerce_int96_timestamp_unit=None, schema=None,
decryption_properties=None, **kwargs):
import pyarrow.dataset as ds

# Raise error for not supported keywords
for keyword, default in [
("schema", None), ("metadata", None),
("split_row_groups", False), ("validate_schema", True),
("metadata_nthreads", 1)]:
("metadata", None), ("split_row_groups", False),
("validate_schema", True), ("metadata_nthreads", None)]:
if keyword in kwargs and kwargs[keyword] is not default:
raise ValueError(
"Keyword '{0}' is not yet supported with the new "
Expand Down Expand Up @@ -1785,7 +1814,7 @@ def __init__(self, path_or_paths, filesystem=None, filters=None,
fragment = parquet_format.make_fragment(single_file, filesystem)

self._dataset = ds.FileSystemDataset(
[fragment], schema=fragment.physical_schema,
[fragment], schema=schema or fragment.physical_schema,
format=parquet_format,
filesystem=fragment.filesystem
)
Expand All @@ -1797,7 +1826,7 @@ def __init__(self, path_or_paths, filesystem=None, filters=None,
infer_dictionary=True)

self._dataset = ds.dataset(path_or_paths, filesystem=filesystem,
format=parquet_format,
schema=schema, format=parquet_format,
partitioning=partitioning,
ignore_prefixes=ignore_prefixes)

Expand Down Expand Up @@ -1909,6 +1938,9 @@ def partitioning(self):
Perform multi-threaded column reads.
metadata : FileMetaData
If separately computed
schema : Schema, optional
Optionally provide the Schema for the parquet dataset, in which case it
will not be inferred from the source.
{1}
use_legacy_dataset : bool, default False
By default, `read_table` uses the new Arrow Datasets API since
Expand Down Expand Up @@ -1959,7 +1991,7 @@ def partitioning(self):


def read_table(source, columns=None, use_threads=True, metadata=None,
use_pandas_metadata=False, memory_map=False,
schema=None, use_pandas_metadata=False, memory_map=False,
read_dictionary=None, filesystem=None, filters=None,
buffer_size=0, partitioning="hive", use_legacy_dataset=False,
ignore_prefixes=None, pre_buffer=True,
Expand All @@ -1976,6 +2008,7 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
try:
dataset = _ParquetDatasetV2(
source,
schema=schema,
filesystem=filesystem,
partitioning=partitioning,
memory_map=memory_map,
Expand All @@ -1999,6 +2032,11 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
"the 'partitioning' keyword is not supported when the "
"pyarrow.dataset module is not available"
)
if schema is not None:
raise ValueError(
"the 'schema' argument is not supported when the "
"pyarrow.dataset module is not available"
)
filesystem, path = _resolve_filesystem_and_path(source, filesystem)
if filesystem is not None:
source = filesystem.open_input_file(path)
Expand All @@ -2025,6 +2063,11 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
"The 'ignore_prefixes' keyword is only supported when "
"use_legacy_dataset=False")

if schema is not None:
raise ValueError(
"The 'schema' argument is only supported when "
"use_legacy_dataset=False")

if _is_path_like(source):
pf = ParquetDataset(
source, metadata=metadata, memory_map=memory_map,
Expand Down
58 changes: 51 additions & 7 deletions python/pyarrow/tests/parquet/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ def test_create_parquet_dataset_multi_threaded(tempdir):

manifest = pq.ParquetManifest(base_path, filesystem=fs,
metadata_nthreads=1)
dataset = pq.ParquetDataset(base_path, filesystem=fs, metadata_nthreads=16)
with pytest.warns(
DeprecationWarning, match="Specifying the 'metadata_nthreads'"
):
dataset = pq.ParquetDataset(
base_path, filesystem=fs, metadata_nthreads=16)
assert len(dataset.pieces) > 0
partitions = dataset.partitions
assert len(partitions.partition_names) > 0
Expand Down Expand Up @@ -783,12 +787,14 @@ def _test_read_common_metadata_files(fs, base_path):


@pytest.mark.pandas
@pytest.mark.filterwarnings("ignore:'ParquetDataset.schema:DeprecationWarning")
def test_read_common_metadata_files(tempdir):
fs = LocalFileSystem._get_instance()
_test_read_common_metadata_files(fs, tempdir)


@pytest.mark.pandas
@pytest.mark.filterwarnings("ignore:'ParquetDataset.schema:DeprecationWarning")
def test_read_metadata_files(tempdir):
fs = LocalFileSystem._get_instance()

Expand Down Expand Up @@ -902,7 +908,8 @@ def read_multiple_files(paths, columns=None, use_threads=True, **kwargs):
result2 = read_multiple_files(paths, metadata=metadata)
assert result2.equals(expected)

result3 = pq.ParquetDataset(dirpath, schema=metadata.schema).read()
with pytest.warns(DeprecationWarning, match="Specifying the 'schema'"):
result3 = pq.ParquetDataset(dirpath, schema=metadata.schema).read()
assert result3.equals(expected)
else:
with pytest.raises(ValueError, match="no longer supported"):
Expand Down Expand Up @@ -947,7 +954,8 @@ def read_multiple_files(paths, columns=None, use_threads=True, **kwargs):
mixed_paths = [bad_apple_path, paths[0]]

with pytest.raises(ValueError):
read_multiple_files(mixed_paths, schema=bad_meta.schema)
with pytest.warns(DeprecationWarning, match="Specifying the 'schema'"):
read_multiple_files(mixed_paths, schema=bad_meta.schema)

with pytest.raises(ValueError):
read_multiple_files(mixed_paths)
Expand Down Expand Up @@ -1195,6 +1203,7 @@ def test_empty_directory(tempdir, use_legacy_dataset):
assert result.num_columns == 0


@pytest.mark.filterwarnings("ignore:'ParquetDataset.schema:DeprecationWarning")
def _test_write_to_dataset_with_partitions(base_path,
use_legacy_dataset=True,
filesystem=None,
Expand Down Expand Up @@ -1236,7 +1245,8 @@ def _test_write_to_dataset_with_partitions(base_path,
use_legacy_dataset=use_legacy_dataset)
# ARROW-2209: Ensure the dataset schema also includes the partition columns
if use_legacy_dataset:
dataset_cols = set(dataset.schema.to_arrow_schema().names)
with pytest.warns(DeprecationWarning, match="'ParquetDataset.schema'"):
dataset_cols = set(dataset.schema.to_arrow_schema().names)
else:
# NB schema property is an arrow and not parquet schema
dataset_cols = set(dataset.schema.names)
Expand Down Expand Up @@ -1541,12 +1551,43 @@ def test_dataset_read_dictionary(tempdir, use_legacy_dataset):
assert c1.equals(ex_chunks[0])


@pytest.mark.pandas
def test_read_table_schema(tempdir):
# test that schema keyword is passed through in read_table
table = pa.table({'a': pa.array([1, 2, 3], pa.int32())})
pq.write_table(table, tempdir / "data1.parquet")
pq.write_table(table, tempdir / "data2.parquet")

schema = pa.schema([('a', 'int64')])

# reading single file (which is special cased in the code)
result = pq.read_table(tempdir / "data1.parquet", schema=schema)
expected = pa.table({'a': [1, 2, 3]}, schema=schema)
assert result.equals(expected)

# reading multiple fiels
result = pq.read_table(tempdir, schema=schema)
expected = pa.table({'a': [1, 2, 3, 1, 2, 3]}, schema=schema)
assert result.equals(expected)

# don't allow it with the legacy reader
with pytest.raises(
ValueError, match="The 'schema' argument is only supported"
):
pq.read_table(tempdir / "data.parquet", schema=schema,
use_legacy_dataset=True)

# using ParquetDataset directory with non-legacy implementation
result = pq.ParquetDataset(
tempdir, schema=schema, use_legacy_dataset=False
)
expected = pa.table({'a': [1, 2, 3, 1, 2, 3]}, schema=schema)
assert result.read().equals(expected)


@pytest.mark.dataset
def test_dataset_unsupported_keywords():

with pytest.raises(ValueError, match="not yet supported with the new"):
pq.ParquetDataset("", use_legacy_dataset=False, schema=pa.schema([]))

with pytest.raises(ValueError, match="not yet supported with the new"):
pq.ParquetDataset("", use_legacy_dataset=False, metadata=pa.schema([]))

Expand Down Expand Up @@ -1653,6 +1694,9 @@ def test_parquet_dataset_deprecated_properties(tempdir):
with pytest.warns(DeprecationWarning, match="'ParquetDataset.fs"):
dataset.fs

with pytest.warns(DeprecationWarning, match="'ParquetDataset.schema'"):
dataset.schema

dataset2 = pq.ParquetDataset(path, use_legacy_dataset=False)

with pytest.warns(DeprecationWarning, match="'ParquetDataset.pieces"):
Expand Down