Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions python/pyarrow/parquet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2929,8 +2929,10 @@ def write_to_dataset(table, root_path, partition_cols=None,
partition_filename_cb=None, filesystem=None,
use_legacy_dataset=None, schema=None,
partitioning=None, basename_template=None,
use_threads=None, file_visitor=None, **kwargs):
"""Wrapper around parquet.write_dataset for writing a Table to
use_threads=None, file_visitor=None,
existing_data_behavior=None,
**kwargs):
"""Wrapper around parquet.write_table for writing a Table to
Parquet format by partitions.
For each combination of partition columns and values,
a subdirectories are created in the following
Expand Down Expand Up @@ -2968,8 +2970,8 @@ def write_to_dataset(table, root_path, partition_cols=None,
Default is False. Set to True to use the the legacy behaviour
(this option is deprecated, and the legacy implementation will be
removed in a future version). The legacy implementation still
supports `partition_filename_cb` and `metadata_collector` keywords
but is less efficient when using partition columns.
supports the `partition_filename_cb` keyword but is less efficient
when using partition columns.
use_threads : bool, default True
Write files in parallel. If enabled, then maximum parallelism will be
used determined by the number of available CPU cores.
Expand All @@ -2983,7 +2985,7 @@ def write_to_dataset(table, root_path, partition_cols=None,
basename_template : str, optional
A template string used to generate basenames of written data files.
The token '{i}' will be replaced with an automatically incremented
integer. If not specified, it defaults to "guid-{i}.parquet"
integer. If not specified, it defaults to "guid-{i}.parquet".
file_visitor : function
If set, this function will be called with a WrittenFile instance
for each file created during the call. This object will have both
Expand All @@ -3003,6 +3005,28 @@ def write_to_dataset(table, root_path, partition_cols=None,

def file_visitor(written_file):
visited_paths.append(written_file.path)
existing_data_behavior : 'overwrite_or_ignore' | 'error' | \
'delete_matching'
Controls how the dataset will handle data that already exists in
the destination. The default behaviour is 'overwrite_or_ignore'.

Only used in the new code path using the new Arrow Dataset API
(``use_legacy_dataset=False``). In case the legacy implementation
is selected the parameter is ignored as the old implementation does
not support it (only has the default behaviour).

'overwrite_or_ignore' will ignore any existing data and will
overwrite files with the same name as an output file. Other
existing files will be ignored. This behavior, in combination
with a unique basename_template for each write, will allow for
an append workflow.

'error' will raise an error if any data exists in the destination.

'delete_matching' is useful when you are writing a partitioned
dataset. The first time each partition directory is encountered
the entire directory will be deleted. This allows you to overwrite
old partitions completely.
**kwargs : dict,
Additional kwargs for write_table function. See docstring for
`write_table` or `ParquetWriter` for more information.
Expand Down Expand Up @@ -3078,14 +3102,16 @@ def file_visitor(written_file):

if basename_template is None:
basename_template = guid() + '-{i}.parquet'
if existing_data_behavior is None:
existing_data_behavior = 'overwrite_or_ignore'

ds.write_dataset(
table, root_path, filesystem=filesystem,
format=parquet_format, file_options=write_options, schema=schema,
partitioning=partitioning, use_threads=use_threads,
file_visitor=file_visitor,
basename_template=basename_template,
existing_data_behavior='overwrite_or_ignore')
existing_data_behavior=existing_data_behavior)
return

# warnings and errors when using legecy implementation
Expand All @@ -3109,6 +3135,8 @@ def file_visitor(written_file):
raise ValueError(msg2.format("use_threads"))
if file_visitor is not None:
raise ValueError(msg2.format("file_visitor"))
if existing_data_behavior is not None:
raise ValueError(msg2.format("existing_data_behavior"))
if partition_filename_cb is not None:
warnings.warn(
_DEPR_MSG.format("partition_filename_cb", " Specify "
Expand Down
3 changes: 3 additions & 0 deletions python/pyarrow/tests/parquet/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1764,3 +1764,6 @@ def test_parquet_write_to_dataset_unsupported_keywards_in_legacy(tempdir):
with pytest.raises(ValueError, match="file_visitor"):
pq.write_to_dataset(table, path, use_legacy_dataset=True,
file_visitor=lambda x: x)
with pytest.raises(ValueError, match="existing_data_behavior"):
pq.write_to_dataset(table, path, use_legacy_dataset=True,
existing_data_behavior='error')