diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 33094dabe68..867babdaf81 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2926,8 +2926,10 @@ def _mkdir_if_not_exists(fs, path): def write_to_dataset(table, root_path, partition_cols=None, partition_filename_cb=None, filesystem=None, - use_legacy_dataset=None, **kwargs): - """Wrapper around parquet.write_table for writing a Table to + use_legacy_dataset=None, schema=None, + partitioning=None, basename_template=None, + use_threads=None, file_visitor=None, **kwargs): + """Wrapper around parquet.write_dataset for writing a Table to Parquet format by partitions. For each combination of partition columns and values, a subdirectories are created in the following @@ -2962,11 +2964,44 @@ def write_to_dataset(table, root_path, partition_cols=None, and allow you to override the partition filename. If nothing is passed, the filename will consist of a uuid. use_legacy_dataset : bool - Default is True unless a ``pyarrow.fs`` filesystem is passed. - Set to False to enable the new code path (experimental, using the - new Arrow Dataset API). This is more efficient when using partition - columns, but does not (yet) support `partition_filename_cb` and - `metadata_collector` keywords. + Default is False. Set to True to use the the legacy behaviour + (this option is deprecated, and the legacy implementation will be + removed in a future version). The legacy implementation still + supports `partition_filename_cb` and `metadata_collector` keywords + but is less efficient when using partition columns. + use_threads : bool, default True + Write files in parallel. If enabled, then maximum parallelism will be + used determined by the number of available CPU cores. + schema : Schema, optional + partitioning : Partitioning or list[str], optional + The partitioning scheme specified with the + ``pyarrow.dataset.partitioning()`` function or a list of field names. + When providing a list of field names, you can use + ``partitioning_flavor`` to drive which partitioning type should be + used. + basename_template : str, optional + A template string used to generate basenames of written data files. + The token '{i}' will be replaced with an automatically incremented + integer. If not specified, it defaults to "guid-{i}.parquet" + file_visitor : function + If set, this function will be called with a WrittenFile instance + for each file created during the call. This object will have both + a path attribute and a metadata attribute. + + The path attribute will be a string containing the path to + the created file. + + The metadata attribute will be the parquet metadata of the file. + This metadata will have the file path attribute set and can be used + to build a _metadata file. The metadata attribute will be None if + the format is not parquet. + + Example visitor which simple collects the filenames created:: + + visited_paths = [] + + def file_visitor(written_file): + visited_paths.append(written_file.path) **kwargs : dict, Additional kwargs for write_table function. See docstring for `write_table` or `ParquetWriter` for more information. @@ -2988,26 +3023,24 @@ def write_to_dataset(table, root_path, partition_cols=None, >>> import pyarrow.parquet as pq >>> pq.write_to_dataset(table, root_path='dataset_name_3', - ... partition_cols=['year'], - ... use_legacy_dataset=False - ... ) + ... partition_cols=['year']) >>> pq.ParquetDataset('dataset_name_3', use_legacy_dataset=False).files ['dataset_name_3/year=2019/part-0.parquet', ... Write a single Parquet file into the root folder: - >>> pq.write_to_dataset(table, root_path='dataset_name_4', - ... use_legacy_dataset=False) + >>> pq.write_to_dataset(table, root_path='dataset_name_4') >>> pq.ParquetDataset('dataset_name_4/', use_legacy_dataset=False).files ['dataset_name_4/part-0.parquet'] """ if use_legacy_dataset is None: - # if a new filesystem is passed -> default to new implementation - if isinstance(filesystem, FileSystem): - use_legacy_dataset = False - # otherwise the default is still True - else: + # if partition_filename_cb is specified -> + # default to the old implementation + if partition_filename_cb: use_legacy_dataset = True + # otherwise the default is False + else: + use_legacy_dataset = False if not use_legacy_dataset: import pyarrow.dataset as ds @@ -3042,13 +3075,48 @@ def file_visitor(written_file): part_schema = table.select(partition_cols).schema partitioning = ds.partitioning(part_schema, flavor="hive") + if basename_template is None: + basename_template = guid() + '-{i}.parquet' + ds.write_dataset( table, root_path, filesystem=filesystem, format=parquet_format, file_options=write_options, schema=schema, partitioning=partitioning, use_threads=use_threads, - file_visitor=file_visitor) + file_visitor=file_visitor, + basename_template=basename_template, + existing_data_behavior='overwrite_or_ignore') return + # warnings and errors when using legecy implementation + if use_legacy_dataset: + warnings.warn( + "Passing 'use_legacy_dataset=True' to get the legacy behaviour is " + "deprecated as of pyarrow 8.0.0, and the legacy implementation " + "will be removed in a future version.", + FutureWarning, stacklevel=2) + msg2 = ( + "The '{}' argument is not supported with the legacy " + "implementation. To use this argument specify " + "'use_legacy_dataset=False' while constructing the " + "ParquetDataset." + ) + if schema is not None: + raise ValueError(msg2.format("schema")) + if partitioning is not None: + raise ValueError(msg2.format("partitioning")) + if use_threads is not None: + raise ValueError(msg2.format("use_threads")) + if file_visitor is not None: + raise ValueError(msg2.format("file_visitor")) + if partition_filename_cb is not None: + warnings.warn( + _DEPR_MSG.format("partition_filename_cb", " Specify " + "'use_legacy_dataset=False' while constructing " + "the ParquetDataset, and then use the " + "'basename_template' parameter instead. For " + "usage see `pyarrow.dataset.write_dataset`"), + FutureWarning, stacklevel=2) + fs, root_path = legacyfs.resolve_filesystem_and_path(root_path, filesystem) _mkdir_if_not_exists(fs, root_path) diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index 2534cce73c6..6326743113f 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -1305,6 +1305,7 @@ def _test_write_to_dataset_no_partitions(base_path, n = 5 for i in range(n): pq.write_to_dataset(output_table, base_path, + use_legacy_dataset=use_legacy_dataset, filesystem=filesystem) output_files = [file for file in filesystem.ls(str(base_path)) if file.endswith(".parquet")] @@ -1544,9 +1545,10 @@ def test_dataset_read_dictionary(tempdir, use_legacy_dataset): path = tempdir / "ARROW-3325-dataset" t1 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0']) t2 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0']) - # TODO pass use_legacy_dataset (need to fix unique names) - pq.write_to_dataset(t1, root_path=str(path)) - pq.write_to_dataset(t2, root_path=str(path)) + pq.write_to_dataset(t1, root_path=str(path), + use_legacy_dataset=use_legacy_dataset) + pq.write_to_dataset(t2, root_path=str(path), + use_legacy_dataset=use_legacy_dataset) result = pq.ParquetDataset( path, read_dictionary=['f0'], @@ -1716,3 +1718,44 @@ def test_parquet_dataset_deprecated_properties(tempdir): with pytest.warns(DeprecationWarning, match="'ParquetDataset.pieces"): dataset2.pieces + + +@pytest.mark.dataset +def test_parquet_write_to_dataset_deprecated_properties(tempdir): + table = pa.table({'a': [1, 2, 3]}) + path = tempdir / 'data.parquet' + + with pytest.warns(FutureWarning, + match="Passing 'use_legacy_dataset=True'"): + pq.write_to_dataset(table, path, use_legacy_dataset=True) + + # check also that legacy implementation is set when + # partition_filename_cb is specified + with pytest.warns(FutureWarning, + match="Passing 'use_legacy_dataset=True'"): + pq.write_to_dataset(table, path, + partition_filename_cb=lambda x: 'filename.parquet') + + +@pytest.mark.dataset +def test_parquet_write_to_dataset_unsupported_keywards_in_legacy(tempdir): + table = pa.table({'a': [1, 2, 3]}) + path = tempdir / 'data.parquet' + + with pytest.raises(ValueError, match="schema"): + pq.write_to_dataset(table, path, use_legacy_dataset=True, + schema=pa.schema([ + ('a', pa.int32()) + ])) + + with pytest.raises(ValueError, match="partitioning"): + pq.write_to_dataset(table, path, use_legacy_dataset=True, + partitioning=["a"]) + + with pytest.raises(ValueError, match="use_threads"): + pq.write_to_dataset(table, path, use_legacy_dataset=True, + use_threads=False) + + with pytest.raises(ValueError, match="file_visitor"): + pq.write_to_dataset(table, path, use_legacy_dataset=True, + file_visitor=lambda x: x) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 9a7f5ea213a..19448d36870 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -971,7 +971,7 @@ def _create_dataset_for_fragments(tempdir, chunk_size=None, filesystem=None): path = str(tempdir / "test_parquet_dataset") # write_to_dataset currently requires pandas - pq.write_to_dataset(table, path, + pq.write_to_dataset(table, path, use_legacy_dataset=True, partition_cols=["part"], chunk_size=chunk_size) dataset = ds.dataset( path, format="parquet", partitioning="hive", filesystem=filesystem @@ -1247,7 +1247,8 @@ def _create_dataset_all_types(tempdir, chunk_size=None): path = str(tempdir / "test_parquet_dataset_all_types") # write_to_dataset currently requires pandas - pq.write_to_dataset(table, path, chunk_size=chunk_size) + pq.write_to_dataset(table, path, use_legacy_dataset=True, + chunk_size=chunk_size) return table, ds.dataset(path, format="parquet", partitioning="hive")