Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3364,7 +3364,8 @@ def _filesystemdataset_write(
Partitioning partitioning not None,
FileWriteOptions file_options not None,
int max_partitions,
object file_visitor
object file_visitor,
str existing_data_behavior not None
):
"""
CFileSystemDataset.Write wrapper
Expand All @@ -3381,6 +3382,18 @@ def _filesystemdataset_write(
c_options.partitioning = partitioning.unwrap()
c_options.max_partitions = max_partitions
c_options.basename_template = tobytes(basename_template)
if existing_data_behavior == 'error':
c_options.existing_data_behavior = ExistingDataBehavior_ERROR
elif existing_data_behavior == 'overwrite':
c_options.existing_data_behavior = ExistingDataBehavior_OVERWRITE
elif existing_data_behavior == 'delete_matching':
c_options.existing_data_behavior = ExistingDataBehavior_DELETE_MATCHING
else:
raise ValueError(
('existing_data_behavior must be one of error, overwrite or',
'delete_matching')
)

if file_visitor is not None:
visit_args = {'base_dir': c_options.base_dir,
'file_visitor': file_visitor}
Expand Down
17 changes: 15 additions & 2 deletions python/pyarrow/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,8 @@ def _ensure_write_partitioning(part, schema, flavor):
def write_dataset(data, base_dir, basename_template=None, format=None,
partitioning=None, partitioning_flavor=None, schema=None,
filesystem=None, file_options=None, use_threads=True,
max_partitions=None, file_visitor=None):
max_partitions=None, file_visitor=None,
existing_data_behavior='error'):
"""
Write a dataset to a given format and partitioning.

Expand Down Expand Up @@ -798,6 +799,18 @@ def write_dataset(data, base_dir, basename_template=None, format=None,

def file_visitor(written_file):
visited_paths.append(written_file.path)
existing_data_behavior : 'error' | 'overwrite' | 'delete_matching'
Controls how the dataset will handle data that already exists in
the destination. The default behavior (error) is to raise an error
if any data exists in the destination.

overwrite will ignore any existing data and will overwrite files
as needed.

delete_matching is useful when you are writing a partitioned
dataset. The first time each partition directory is encountered
the entire directory will be deleted. This allows you to overwrite
old partitions completely.
"""
from pyarrow.fs import _resolve_filesystem_and_path

Expand Down Expand Up @@ -860,5 +873,5 @@ def file_visitor(written_file):

_filesystemdataset_write(
scanner, base_dir, basename_template, filesystem, partitioning,
file_options, max_partitions, file_visitor
file_options, max_partitions, file_visitor, existing_data_behavior
)
9 changes: 9 additions & 0 deletions python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ ctypedef void cb_writer_finish(dict, CFileWriter*)

cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:

cdef enum ExistingDataBehavior" arrow::dataset::ExistingDataBehavior":
ExistingDataBehavior_DELETE_MATCHING" \
arrow::dataset::ExistingDataBehavior::kDeleteMatchingPartitions"
ExistingDataBehavior_OVERWRITE" \
arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore"
ExistingDataBehavior_ERROR" \
arrow::dataset::ExistingDataBehavior::kError"

cdef cppclass CScanOptions "arrow::dataset::ScanOptions":
@staticmethod
shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema)
Expand Down Expand Up @@ -278,6 +286,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
c_string basename_template
function[cb_writer_finish_internal] writer_pre_finish
function[cb_writer_finish_internal] writer_post_finish
ExistingDataBehavior existing_data_behavior

cdef cppclass CFileSystemDataset \
"arrow::dataset::FileSystemDataset"(CDataset):
Expand Down
48 changes: 48 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3480,6 +3480,54 @@ def test_write_dataset_with_dataset(tempdir):
assert dict(load_back_table.to_pydict()) == table.to_pydict()


@pytest.mark.pandas
def test_write_dataset_existing_data(tempdir):
directory = tempdir / 'ds'
table = pa.table({'b': ['x', 'y', 'z'], 'c': [1, 2, 3]})
partitioning = ds.partitioning(schema=pa.schema(
[pa.field('c', pa.int64())]), flavor='hive')

def compare_tables_ignoring_order(t1, t2):
df1 = t1.to_pandas().sort_values('b', ignore_index=True)
df2 = t2.to_pandas().sort_values('b', ignore_index=True)
assert df1.equals(df2)

# First write is ok
ds.write_dataset(table, directory, partitioning=partitioning, format='ipc')

table = pa.table({'b': ['a', 'b', 'c'], 'c': [2, 3, 4]})

# Second write should fail
with pytest.raises(pa.ArrowInvalid):
ds.write_dataset(table, directory,
partitioning=partitioning, format='ipc')

extra_table = pa.table({'b': ['e']})
extra_file = directory / 'c=2' / 'foo.arrow'
pyarrow.feather.write_feather(extra_table, extra_file)

# Should be ok and overwrite with overwrite behaivor
ds.write_dataset(table, directory, partitioning=partitioning,
format='ipc', existing_data_behavior='overwrite')

overwritten = pa.table(
{'b': ['e', 'x', 'a', 'b', 'c'], 'c': [2, 1, 2, 3, 4]})
readback = ds.dataset(tempdir, format='ipc',
partitioning=partitioning).to_table()
compare_tables_ignoring_order(readback, overwritten)
assert extra_file.exists()

# Should be ok and delete matching with delete_matching
ds.write_dataset(table, directory, partitioning=partitioning,
format='ipc', existing_data_behavior='delete_matching')

overwritten = pa.table({'b': ['x', 'a', 'b', 'c'], 'c': [1, 2, 3, 4]})
readback = ds.dataset(tempdir, format='ipc',
partitioning=partitioning).to_table()
compare_tables_ignoring_order(readback, overwritten)
assert not extra_file.exists()


@pytest.mark.parquet
@pytest.mark.pandas
def test_write_dataset_partitioned_dict(tempdir):
Expand Down