Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3364,7 +3364,8 @@ def _filesystemdataset_write(
Partitioning partitioning not None,
FileWriteOptions file_options not None,
int max_partitions,
object file_visitor
object file_visitor,
str existing_data_behavior not None
):
"""
CFileSystemDataset.Write wrapper
Expand All @@ -3381,6 +3382,19 @@ def _filesystemdataset_write(
c_options.partitioning = partitioning.unwrap()
c_options.max_partitions = max_partitions
c_options.basename_template = tobytes(basename_template)
if existing_data_behavior == 'error':
c_options.existing_data_behavior = ExistingDataBehavior_ERROR
elif existing_data_behavior == 'overwrite_or_ignore':
c_options.existing_data_behavior =\
ExistingDataBehavior_OVERWRITE_OR_IGNORE
elif existing_data_behavior == 'delete_matching':
c_options.existing_data_behavior = ExistingDataBehavior_DELETE_MATCHING
else:
raise ValueError(
("existing_data_behavior must be one of 'error', ",
"'overwrite_or_ignore' or 'delete_matching'")
)

if file_visitor is not None:
visit_args = {'base_dir': c_options.base_dir,
'file_visitor': file_visitor}
Expand Down
21 changes: 19 additions & 2 deletions python/pyarrow/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,8 @@ def _ensure_write_partitioning(part, schema, flavor):
def write_dataset(data, base_dir, basename_template=None, format=None,
partitioning=None, partitioning_flavor=None, schema=None,
filesystem=None, file_options=None, use_threads=True,
max_partitions=None, file_visitor=None):
max_partitions=None, file_visitor=None,
existing_data_behavior='error'):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR, but I would expect most of the parameters to be declared keyword-only. @jorisvandenbossche Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would indeed be good. But maybe let's leave that for another (non 6.0.1) PR? (could already mark the new keyword here)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, definitely make it a separate JIRA.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

"""
Write a dataset to a given format and partitioning.

Expand Down Expand Up @@ -798,6 +799,22 @@ def write_dataset(data, base_dir, basename_template=None, format=None,

def file_visitor(written_file):
visited_paths.append(written_file.path)
existing_data_behavior : 'error' | 'overwrite_or_ignore' | \
'delete_matching'
Controls how the dataset will handle data that already exists in
the destination. The default behavior ('error') is to raise an error
if any data exists in the destination.

'overwrite_or_ignore' will ignore any existing data and will
overwrite files with the same name as an output file. Other
existing files will be ignored. This behavior, in combination
with a unique basename_template for each write, will allow for
an append workflow.

'delete_matching' is useful when you are writing a partitioned
dataset. The first time each partition directory is encountered
the entire directory will be deleted. This allows you to overwrite
old partitions completely.
"""
from pyarrow.fs import _resolve_filesystem_and_path

Expand Down Expand Up @@ -860,5 +877,5 @@ def file_visitor(written_file):

_filesystemdataset_write(
scanner, base_dir, basename_template, filesystem, partitioning,
file_options, max_partitions, file_visitor
file_options, max_partitions, file_visitor, existing_data_behavior
)
9 changes: 9 additions & 0 deletions python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ ctypedef void cb_writer_finish(dict, CFileWriter*)

cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:

cdef enum ExistingDataBehavior" arrow::dataset::ExistingDataBehavior":
ExistingDataBehavior_DELETE_MATCHING" \
arrow::dataset::ExistingDataBehavior::kDeleteMatchingPartitions"
ExistingDataBehavior_OVERWRITE_OR_IGNORE" \
arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore"
ExistingDataBehavior_ERROR" \
arrow::dataset::ExistingDataBehavior::kError"

cdef cppclass CScanOptions "arrow::dataset::ScanOptions":
@staticmethod
shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema)
Expand Down Expand Up @@ -278,6 +286,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
c_string basename_template
function[cb_writer_finish_internal] writer_pre_finish
function[cb_writer_finish_internal] writer_post_finish
ExistingDataBehavior existing_data_behavior

cdef cppclass CFileSystemDataset \
"arrow::dataset::FileSystemDataset"(CDataset):
Expand Down
49 changes: 49 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3481,6 +3481,55 @@ def test_write_dataset_with_dataset(tempdir):
assert dict(load_back_table.to_pydict()) == table.to_pydict()


@pytest.mark.pandas
def test_write_dataset_existing_data(tempdir):
directory = tempdir / 'ds'
table = pa.table({'b': ['x', 'y', 'z'], 'c': [1, 2, 3]})
partitioning = ds.partitioning(schema=pa.schema(
[pa.field('c', pa.int64())]), flavor='hive')

def compare_tables_ignoring_order(t1, t2):
df1 = t1.to_pandas().sort_values('b').reset_index(drop=True)
df2 = t2.to_pandas().sort_values('b').reset_index(drop=True)
assert df1.equals(df2)

# First write is ok
ds.write_dataset(table, directory, partitioning=partitioning, format='ipc')

table = pa.table({'b': ['a', 'b', 'c'], 'c': [2, 3, 4]})

# Second write should fail
with pytest.raises(pa.ArrowInvalid):
ds.write_dataset(table, directory,
partitioning=partitioning, format='ipc')

extra_table = pa.table({'b': ['e']})
extra_file = directory / 'c=2' / 'foo.arrow'
pyarrow.feather.write_feather(extra_table, extra_file)

# Should be ok and overwrite with overwrite behavior
ds.write_dataset(table, directory, partitioning=partitioning,
format='ipc',
existing_data_behavior='overwrite_or_ignore')

overwritten = pa.table(
{'b': ['e', 'x', 'a', 'b', 'c'], 'c': [2, 1, 2, 3, 4]})
readback = ds.dataset(tempdir, format='ipc',
partitioning=partitioning).to_table()
compare_tables_ignoring_order(readback, overwritten)
assert extra_file.exists()

# Should be ok and delete matching with delete_matching
ds.write_dataset(table, directory, partitioning=partitioning,
format='ipc', existing_data_behavior='delete_matching')

overwritten = pa.table({'b': ['x', 'a', 'b', 'c'], 'c': [1, 2, 3, 4]})
readback = ds.dataset(tempdir, format='ipc',
partitioning=partitioning).to_table()
compare_tables_ignoring_order(readback, overwritten)
assert not extra_file.exists()


@pytest.mark.parquet
@pytest.mark.pandas
def test_write_dataset_partitioned_dict(tempdir):
Expand Down