From 7f664543477077d3ced6533b51566107df8fab41 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 8 Mar 2022 16:35:49 +0100 Subject: [PATCH 1/3] deprecate schema keyword+attribute for legacy ParquetDataset + expose it in the non-legacy read_table --- python/pyarrow/parquet.py | 63 +++++++++++++++----- python/pyarrow/tests/parquet/test_dataset.py | 52 ++++++++++++++-- 2 files changed, 94 insertions(+), 21 deletions(-) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 5589dd14eb2..2cacb191603 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -1447,7 +1447,15 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, else: self.metadata = metadata - self.schema = schema + if schema is not None: + warnings.warn( + "Specifying the 'schema' keyword with 'use_legacy_dataset=" + "True' is deprecated as of pyarrow 8.0.0. You can still " + "specify it in combination with 'use_legacy_dataet=False', " + "but in that case you need to specify a pyarrow.Schema " + "instead of a ParquetSchema.", + DeprecationWarning, stacklevel=2) + self._schema = schema self.split_row_groups = split_row_groups @@ -1469,7 +1477,7 @@ def equals(self, other): return False for prop in ('paths', '_pieces', '_partitions', 'common_metadata_path', 'metadata_path', - 'common_metadata', 'metadata', 'schema', + 'common_metadata', 'metadata', '_schema', 'split_row_groups'): if getattr(self, prop) != getattr(other, prop): return False @@ -1486,16 +1494,16 @@ def __eq__(self, other): return NotImplemented def validate_schemas(self): - if self.metadata is None and self.schema is None: + if self.metadata is None and self._schema is None: if self.common_metadata is not None: - self.schema = self.common_metadata.schema + self._schema = self.common_metadata.schema else: - self.schema = self._pieces[0].get_metadata().schema - elif self.schema is None: - self.schema = self.metadata.schema + self._schema = self._pieces[0].get_metadata().schema + elif self._schema is None: + self._schema = self.metadata.schema # Verify schemas are all compatible - dataset_schema = self.schema.to_arrow_schema() + dataset_schema = self._schema.to_arrow_schema() # Exclude the partition columns from the schema, they are provided # by the path, not the DatasetPiece if self._partitions is not None: @@ -1612,6 +1620,18 @@ def partitions(self): DeprecationWarning, stacklevel=2) return self._partitions + @property + def schema(self): + warnings.warn( + _DEPR_MSG.format( + "ParquetDataset.schema", + " Specify 'use_legacy_dataset=False' while constructing the " + "ParquetDataset, and then use the '.schema' attribute " + "instead (which will return an Arrow schema instead of a " + "Parquet schema)."), + DeprecationWarning, stacklevel=2) + return self._schema + @property def memory_map(self): warnings.warn( @@ -1706,15 +1726,14 @@ class _ParquetDatasetV2: def __init__(self, path_or_paths, filesystem=None, filters=None, partitioning="hive", read_dictionary=None, buffer_size=None, memory_map=False, ignore_prefixes=None, pre_buffer=True, - coerce_int96_timestamp_unit=None, + coerce_int96_timestamp_unit=None, schema=None, decryption_properties=None, **kwargs): import pyarrow.dataset as ds # Raise error for not supported keywords for keyword, default in [ - ("schema", None), ("metadata", None), - ("split_row_groups", False), ("validate_schema", True), - ("metadata_nthreads", 1)]: + ("metadata", None), ("split_row_groups", False), + ("validate_schema", True), ("metadata_nthreads", 1)]: if keyword in kwargs and kwargs[keyword] is not default: raise ValueError( "Keyword '{0}' is not yet supported with the new " @@ -1785,7 +1804,7 @@ def __init__(self, path_or_paths, filesystem=None, filters=None, fragment = parquet_format.make_fragment(single_file, filesystem) self._dataset = ds.FileSystemDataset( - [fragment], schema=fragment.physical_schema, + [fragment], schema=schema or fragment.physical_schema, format=parquet_format, filesystem=fragment.filesystem ) @@ -1797,7 +1816,7 @@ def __init__(self, path_or_paths, filesystem=None, filters=None, infer_dictionary=True) self._dataset = ds.dataset(path_or_paths, filesystem=filesystem, - format=parquet_format, + schema=schema, format=parquet_format, partitioning=partitioning, ignore_prefixes=ignore_prefixes) @@ -1909,6 +1928,9 @@ def partitioning(self): Perform multi-threaded column reads. metadata : FileMetaData If separately computed +schema : Schema, optional + Optionally provide the Schema for the parquet dataset, in which case it + will not be inferred from the source. {1} use_legacy_dataset : bool, default False By default, `read_table` uses the new Arrow Datasets API since @@ -1959,7 +1981,7 @@ def partitioning(self): def read_table(source, columns=None, use_threads=True, metadata=None, - use_pandas_metadata=False, memory_map=False, + schema=None, use_pandas_metadata=False, memory_map=False, read_dictionary=None, filesystem=None, filters=None, buffer_size=0, partitioning="hive", use_legacy_dataset=False, ignore_prefixes=None, pre_buffer=True, @@ -1976,6 +1998,7 @@ def read_table(source, columns=None, use_threads=True, metadata=None, try: dataset = _ParquetDatasetV2( source, + schema=schema, filesystem=filesystem, partitioning=partitioning, memory_map=memory_map, @@ -1999,6 +2022,11 @@ def read_table(source, columns=None, use_threads=True, metadata=None, "the 'partitioning' keyword is not supported when the " "pyarrow.dataset module is not available" ) + if schema is not None: + raise ValueError( + "the 'schema' keyword is not supported when the " + "pyarrow.dataset module is not available" + ) filesystem, path = _resolve_filesystem_and_path(source, filesystem) if filesystem is not None: source = filesystem.open_input_file(path) @@ -2025,6 +2053,11 @@ def read_table(source, columns=None, use_threads=True, metadata=None, "The 'ignore_prefixes' keyword is only supported when " "use_legacy_dataset=False") + if schema is not None: + raise ValueError( + "The 'schema' keyword is only supported when " + "use_legacy_dataset=False") + if _is_path_like(source): pf = ParquetDataset( source, metadata=metadata, memory_map=memory_map, diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index 11912eca8da..f6766e2417b 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -783,12 +783,14 @@ def _test_read_common_metadata_files(fs, base_path): @pytest.mark.pandas +@pytest.mark.filterwarnings("ignore:'ParquetDataset.schema:DeprecationWarning") def test_read_common_metadata_files(tempdir): fs = LocalFileSystem._get_instance() _test_read_common_metadata_files(fs, tempdir) @pytest.mark.pandas +@pytest.mark.filterwarnings("ignore:'ParquetDataset.schema:DeprecationWarning") def test_read_metadata_files(tempdir): fs = LocalFileSystem._get_instance() @@ -902,7 +904,8 @@ def read_multiple_files(paths, columns=None, use_threads=True, **kwargs): result2 = read_multiple_files(paths, metadata=metadata) assert result2.equals(expected) - result3 = pq.ParquetDataset(dirpath, schema=metadata.schema).read() + with pytest.warns(DeprecationWarning, match="Specifying the 'schema'"): + result3 = pq.ParquetDataset(dirpath, schema=metadata.schema).read() assert result3.equals(expected) else: with pytest.raises(ValueError, match="no longer supported"): @@ -947,7 +950,8 @@ def read_multiple_files(paths, columns=None, use_threads=True, **kwargs): mixed_paths = [bad_apple_path, paths[0]] with pytest.raises(ValueError): - read_multiple_files(mixed_paths, schema=bad_meta.schema) + with pytest.warns(DeprecationWarning, match="Specifying the 'schema'"): + read_multiple_files(mixed_paths, schema=bad_meta.schema) with pytest.raises(ValueError): read_multiple_files(mixed_paths) @@ -1195,6 +1199,7 @@ def test_empty_directory(tempdir, use_legacy_dataset): assert result.num_columns == 0 +@pytest.mark.filterwarnings("ignore:'ParquetDataset.schema:DeprecationWarning") def _test_write_to_dataset_with_partitions(base_path, use_legacy_dataset=True, filesystem=None, @@ -1236,7 +1241,8 @@ def _test_write_to_dataset_with_partitions(base_path, use_legacy_dataset=use_legacy_dataset) # ARROW-2209: Ensure the dataset schema also includes the partition columns if use_legacy_dataset: - dataset_cols = set(dataset.schema.to_arrow_schema().names) + with pytest.warns(DeprecationWarning, match="'ParquetDataset.schema'"): + dataset_cols = set(dataset.schema.to_arrow_schema().names) else: # NB schema property is an arrow and not parquet schema dataset_cols = set(dataset.schema.names) @@ -1541,12 +1547,43 @@ def test_dataset_read_dictionary(tempdir, use_legacy_dataset): assert c1.equals(ex_chunks[0]) +@pytest.mark.pandas +def test_read_table_schema(tempdir): + # test that schema keyword is passed through in read_table + table = pa.table({'a': pa.array([1, 2, 3], pa.int32())}) + pq.write_table(table, tempdir / "data1.parquet") + pq.write_table(table, tempdir / "data2.parquet") + + schema = pa.schema([('a', 'int64')]) + + # reading single file (which is special cased in the code) + result = pq.read_table(tempdir / "data1.parquet", schema=schema) + expected = pa.table({'a': [1, 2, 3]}, schema=schema) + assert result.equals(expected) + + # reading multiple fiels + result = pq.read_table(tempdir, schema=schema) + expected = pa.table({'a': [1, 2, 3, 1, 2, 3]}, schema=schema) + assert result.equals(expected) + + # don't allow it with the legacy reader + with pytest.raises( + ValueError, match="The 'schema' keyword is only supported" + ): + pq.read_table(tempdir / "data.parquet", schema=schema, + use_legacy_dataset=True) + + # using ParquetDataset directory with non-legacy implementation + result = pq.ParquetDataset( + tempdir, schema=schema, use_legacy_dataset=False + ) + expected = pa.table({'a': [1, 2, 3, 1, 2, 3]}, schema=schema) + assert result.read().equals(expected) + + @pytest.mark.dataset def test_dataset_unsupported_keywords(): - with pytest.raises(ValueError, match="not yet supported with the new"): - pq.ParquetDataset("", use_legacy_dataset=False, schema=pa.schema([])) - with pytest.raises(ValueError, match="not yet supported with the new"): pq.ParquetDataset("", use_legacy_dataset=False, metadata=pa.schema([])) @@ -1653,6 +1690,9 @@ def test_parquet_dataset_deprecated_properties(tempdir): with pytest.warns(DeprecationWarning, match="'ParquetDataset.fs"): dataset.fs + with pytest.warns(DeprecationWarning, match="'ParquetDataset.schema'"): + dataset.schema + dataset2 = pq.ParquetDataset(path, use_legacy_dataset=False) with pytest.warns(DeprecationWarning, match="'ParquetDataset.pieces"): From 6f255307ad814863b78859e1335d6df1d87ddc9b Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 8 Mar 2022 16:44:21 +0100 Subject: [PATCH 2/3] deprecate metadata_nthreads keyword --- python/pyarrow/parquet.py | 16 +++++++++++++--- python/pyarrow/tests/parquet/test_dataset.py | 6 +++++- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 2cacb191603..ac620cc4cbf 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -1368,7 +1368,7 @@ class ParquetDataset: def __new__(cls, path_or_paths=None, filesystem=None, schema=None, metadata=None, split_row_groups=False, validate_schema=True, - filters=None, metadata_nthreads=1, read_dictionary=None, + filters=None, metadata_nthreads=None, read_dictionary=None, memory_map=False, buffer_size=0, partitioning="hive", use_legacy_dataset=None, pre_buffer=True, coerce_int96_timestamp_unit=None): @@ -1401,7 +1401,7 @@ def __new__(cls, path_or_paths=None, filesystem=None, schema=None, def __init__(self, path_or_paths, filesystem=None, schema=None, metadata=None, split_row_groups=False, validate_schema=True, - filters=None, metadata_nthreads=1, read_dictionary=None, + filters=None, metadata_nthreads=None, read_dictionary=None, memory_map=False, buffer_size=0, partitioning="hive", use_legacy_dataset=True, pre_buffer=True, coerce_int96_timestamp_unit=None): @@ -1409,6 +1409,16 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, raise ValueError( 'Only "hive" for hive-like partitioning is supported when ' 'using use_legacy_dataset=True') + if metadata_nthreads is not None: + warnings.warn( + "Specifying the 'metadata_nthreads' keyword is deprecated as " + "of pyarrow 8.0.0, and the keyword will be removed in a " + "future version", + DeprecationWarning, stacklevel=2, + ) + else: + metadata_nthreads = 1 + self._metadata = _ParquetDatasetMetadata() a_path = path_or_paths if isinstance(a_path, list): @@ -1733,7 +1743,7 @@ def __init__(self, path_or_paths, filesystem=None, filters=None, # Raise error for not supported keywords for keyword, default in [ ("metadata", None), ("split_row_groups", False), - ("validate_schema", True), ("metadata_nthreads", 1)]: + ("validate_schema", True), ("metadata_nthreads", None)]: if keyword in kwargs and kwargs[keyword] is not default: raise ValueError( "Keyword '{0}' is not yet supported with the new " diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index f6766e2417b..5ee9ae5fd84 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -150,7 +150,11 @@ def test_create_parquet_dataset_multi_threaded(tempdir): manifest = pq.ParquetManifest(base_path, filesystem=fs, metadata_nthreads=1) - dataset = pq.ParquetDataset(base_path, filesystem=fs, metadata_nthreads=16) + with pytest.warns( + DeprecationWarning, match="Specifying the 'metadata_nthreads'" + ): + dataset = pq.ParquetDataset( + base_path, filesystem=fs, metadata_nthreads=16) assert len(dataset.pieces) > 0 partitions = dataset.partitions assert len(partitions.partition_names) > 0 From 6ebd21646c6edc76cb7a88fda7e25bbfa8d62b5f Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 15 Mar 2022 20:24:23 +0100 Subject: [PATCH 3/3] keyword -> argument --- python/pyarrow/parquet.py | 10 +++++----- python/pyarrow/tests/parquet/test_dataset.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index ac620cc4cbf..1bf51168dcd 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -1411,8 +1411,8 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, 'using use_legacy_dataset=True') if metadata_nthreads is not None: warnings.warn( - "Specifying the 'metadata_nthreads' keyword is deprecated as " - "of pyarrow 8.0.0, and the keyword will be removed in a " + "Specifying the 'metadata_nthreads' argument is deprecated as " + "of pyarrow 8.0.0, and the argument will be removed in a " "future version", DeprecationWarning, stacklevel=2, ) @@ -1459,7 +1459,7 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, if schema is not None: warnings.warn( - "Specifying the 'schema' keyword with 'use_legacy_dataset=" + "Specifying the 'schema' argument with 'use_legacy_dataset=" "True' is deprecated as of pyarrow 8.0.0. You can still " "specify it in combination with 'use_legacy_dataet=False', " "but in that case you need to specify a pyarrow.Schema " @@ -2034,7 +2034,7 @@ def read_table(source, columns=None, use_threads=True, metadata=None, ) if schema is not None: raise ValueError( - "the 'schema' keyword is not supported when the " + "the 'schema' argument is not supported when the " "pyarrow.dataset module is not available" ) filesystem, path = _resolve_filesystem_and_path(source, filesystem) @@ -2065,7 +2065,7 @@ def read_table(source, columns=None, use_threads=True, metadata=None, if schema is not None: raise ValueError( - "The 'schema' keyword is only supported when " + "The 'schema' argument is only supported when " "use_legacy_dataset=False") if _is_path_like(source): diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index 5ee9ae5fd84..e0a7667cc18 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -1572,7 +1572,7 @@ def test_read_table_schema(tempdir): # don't allow it with the legacy reader with pytest.raises( - ValueError, match="The 'schema' keyword is only supported" + ValueError, match="The 'schema' argument is only supported" ): pq.read_table(tempdir / "data.parquet", schema=schema, use_legacy_dataset=True)