diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 4e6b85f22cd..dc962c60ff8 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2929,8 +2929,10 @@ def write_to_dataset(table, root_path, partition_cols=None, partition_filename_cb=None, filesystem=None, use_legacy_dataset=None, schema=None, partitioning=None, basename_template=None, - use_threads=None, file_visitor=None, **kwargs): - """Wrapper around parquet.write_dataset for writing a Table to + use_threads=None, file_visitor=None, + existing_data_behavior=None, + **kwargs): + """Wrapper around parquet.write_table for writing a Table to Parquet format by partitions. For each combination of partition columns and values, a subdirectories are created in the following @@ -2968,8 +2970,8 @@ def write_to_dataset(table, root_path, partition_cols=None, Default is False. Set to True to use the the legacy behaviour (this option is deprecated, and the legacy implementation will be removed in a future version). The legacy implementation still - supports `partition_filename_cb` and `metadata_collector` keywords - but is less efficient when using partition columns. + supports the `partition_filename_cb` keyword but is less efficient + when using partition columns. use_threads : bool, default True Write files in parallel. If enabled, then maximum parallelism will be used determined by the number of available CPU cores. @@ -2983,7 +2985,7 @@ def write_to_dataset(table, root_path, partition_cols=None, basename_template : str, optional A template string used to generate basenames of written data files. The token '{i}' will be replaced with an automatically incremented - integer. If not specified, it defaults to "guid-{i}.parquet" + integer. If not specified, it defaults to "guid-{i}.parquet". file_visitor : function If set, this function will be called with a WrittenFile instance for each file created during the call. This object will have both @@ -3003,6 +3005,28 @@ def write_to_dataset(table, root_path, partition_cols=None, def file_visitor(written_file): visited_paths.append(written_file.path) + existing_data_behavior : 'overwrite_or_ignore' | 'error' | \ +'delete_matching' + Controls how the dataset will handle data that already exists in + the destination. The default behaviour is 'overwrite_or_ignore'. + + Only used in the new code path using the new Arrow Dataset API + (``use_legacy_dataset=False``). In case the legacy implementation + is selected the parameter is ignored as the old implementation does + not support it (only has the default behaviour). + + 'overwrite_or_ignore' will ignore any existing data and will + overwrite files with the same name as an output file. Other + existing files will be ignored. This behavior, in combination + with a unique basename_template for each write, will allow for + an append workflow. + + 'error' will raise an error if any data exists in the destination. + + 'delete_matching' is useful when you are writing a partitioned + dataset. The first time each partition directory is encountered + the entire directory will be deleted. This allows you to overwrite + old partitions completely. **kwargs : dict, Additional kwargs for write_table function. See docstring for `write_table` or `ParquetWriter` for more information. @@ -3078,6 +3102,8 @@ def file_visitor(written_file): if basename_template is None: basename_template = guid() + '-{i}.parquet' + if existing_data_behavior is None: + existing_data_behavior = 'overwrite_or_ignore' ds.write_dataset( table, root_path, filesystem=filesystem, @@ -3085,7 +3111,7 @@ def file_visitor(written_file): partitioning=partitioning, use_threads=use_threads, file_visitor=file_visitor, basename_template=basename_template, - existing_data_behavior='overwrite_or_ignore') + existing_data_behavior=existing_data_behavior) return # warnings and errors when using legecy implementation @@ -3109,6 +3135,8 @@ def file_visitor(written_file): raise ValueError(msg2.format("use_threads")) if file_visitor is not None: raise ValueError(msg2.format("file_visitor")) + if existing_data_behavior is not None: + raise ValueError(msg2.format("existing_data_behavior")) if partition_filename_cb is not None: warnings.warn( _DEPR_MSG.format("partition_filename_cb", " Specify " diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index 6477132dcd3..4ab6d0ab5ca 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -1764,3 +1764,6 @@ def test_parquet_write_to_dataset_unsupported_keywards_in_legacy(tempdir): with pytest.raises(ValueError, match="file_visitor"): pq.write_to_dataset(table, path, use_legacy_dataset=True, file_visitor=lambda x: x) + with pytest.raises(ValueError, match="existing_data_behavior"): + pq.write_to_dataset(table, path, use_legacy_dataset=True, + existing_data_behavior='error')