diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 42d702095c2..459c3b8fb76 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -3364,7 +3364,8 @@ def _filesystemdataset_write( Partitioning partitioning not None, FileWriteOptions file_options not None, int max_partitions, - object file_visitor + object file_visitor, + str existing_data_behavior not None ): """ CFileSystemDataset.Write wrapper @@ -3381,6 +3382,19 @@ def _filesystemdataset_write( c_options.partitioning = partitioning.unwrap() c_options.max_partitions = max_partitions c_options.basename_template = tobytes(basename_template) + if existing_data_behavior == 'error': + c_options.existing_data_behavior = ExistingDataBehavior_ERROR + elif existing_data_behavior == 'overwrite_or_ignore': + c_options.existing_data_behavior =\ + ExistingDataBehavior_OVERWRITE_OR_IGNORE + elif existing_data_behavior == 'delete_matching': + c_options.existing_data_behavior = ExistingDataBehavior_DELETE_MATCHING + else: + raise ValueError( + ("existing_data_behavior must be one of 'error', ", + "'overwrite_or_ignore' or 'delete_matching'") + ) + if file_visitor is not None: visit_args = {'base_dir': c_options.base_dir, 'file_visitor': file_visitor} diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 70aeb150b1b..42515a9f4bd 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -736,7 +736,8 @@ def _ensure_write_partitioning(part, schema, flavor): def write_dataset(data, base_dir, basename_template=None, format=None, partitioning=None, partitioning_flavor=None, schema=None, filesystem=None, file_options=None, use_threads=True, - max_partitions=None, file_visitor=None): + max_partitions=None, file_visitor=None, + existing_data_behavior='error'): """ Write a dataset to a given format and partitioning. @@ -798,6 +799,22 @@ def write_dataset(data, base_dir, basename_template=None, format=None, def file_visitor(written_file): visited_paths.append(written_file.path) + existing_data_behavior : 'error' | 'overwrite_or_ignore' | \ +'delete_matching' + Controls how the dataset will handle data that already exists in + the destination. The default behavior ('error') is to raise an error + if any data exists in the destination. + + 'overwrite_or_ignore' will ignore any existing data and will + overwrite files with the same name as an output file. Other + existing files will be ignored. This behavior, in combination + with a unique basename_template for each write, will allow for + an append workflow. + + 'delete_matching' is useful when you are writing a partitioned + dataset. The first time each partition directory is encountered + the entire directory will be deleted. This allows you to overwrite + old partitions completely. """ from pyarrow.fs import _resolve_filesystem_and_path @@ -860,5 +877,5 @@ def file_visitor(written_file): _filesystemdataset_write( scanner, base_dir, basename_template, filesystem, partitioning, - file_options, max_partitions, file_visitor + file_options, max_partitions, file_visitor, existing_data_behavior ) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 2527cde62b0..abc79fea813 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -86,6 +86,14 @@ ctypedef void cb_writer_finish(dict, CFileWriter*) cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: + cdef enum ExistingDataBehavior" arrow::dataset::ExistingDataBehavior": + ExistingDataBehavior_DELETE_MATCHING" \ + arrow::dataset::ExistingDataBehavior::kDeleteMatchingPartitions" + ExistingDataBehavior_OVERWRITE_OR_IGNORE" \ + arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore" + ExistingDataBehavior_ERROR" \ + arrow::dataset::ExistingDataBehavior::kError" + cdef cppclass CScanOptions "arrow::dataset::ScanOptions": @staticmethod shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema) @@ -278,6 +286,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: c_string basename_template function[cb_writer_finish_internal] writer_pre_finish function[cb_writer_finish_internal] writer_post_finish + ExistingDataBehavior existing_data_behavior cdef cppclass CFileSystemDataset \ "arrow::dataset::FileSystemDataset"(CDataset): diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index b4959512f11..7d539551b65 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3481,6 +3481,55 @@ def test_write_dataset_with_dataset(tempdir): assert dict(load_back_table.to_pydict()) == table.to_pydict() +@pytest.mark.pandas +def test_write_dataset_existing_data(tempdir): + directory = tempdir / 'ds' + table = pa.table({'b': ['x', 'y', 'z'], 'c': [1, 2, 3]}) + partitioning = ds.partitioning(schema=pa.schema( + [pa.field('c', pa.int64())]), flavor='hive') + + def compare_tables_ignoring_order(t1, t2): + df1 = t1.to_pandas().sort_values('b').reset_index(drop=True) + df2 = t2.to_pandas().sort_values('b').reset_index(drop=True) + assert df1.equals(df2) + + # First write is ok + ds.write_dataset(table, directory, partitioning=partitioning, format='ipc') + + table = pa.table({'b': ['a', 'b', 'c'], 'c': [2, 3, 4]}) + + # Second write should fail + with pytest.raises(pa.ArrowInvalid): + ds.write_dataset(table, directory, + partitioning=partitioning, format='ipc') + + extra_table = pa.table({'b': ['e']}) + extra_file = directory / 'c=2' / 'foo.arrow' + pyarrow.feather.write_feather(extra_table, extra_file) + + # Should be ok and overwrite with overwrite behavior + ds.write_dataset(table, directory, partitioning=partitioning, + format='ipc', + existing_data_behavior='overwrite_or_ignore') + + overwritten = pa.table( + {'b': ['e', 'x', 'a', 'b', 'c'], 'c': [2, 1, 2, 3, 4]}) + readback = ds.dataset(tempdir, format='ipc', + partitioning=partitioning).to_table() + compare_tables_ignoring_order(readback, overwritten) + assert extra_file.exists() + + # Should be ok and delete matching with delete_matching + ds.write_dataset(table, directory, partitioning=partitioning, + format='ipc', existing_data_behavior='delete_matching') + + overwritten = pa.table({'b': ['x', 'a', 'b', 'c'], 'c': [1, 2, 3, 4]}) + readback = ds.dataset(tempdir, format='ipc', + partitioning=partitioning).to_table() + compare_tables_ignoring_order(readback, overwritten) + assert not extra_file.exists() + + @pytest.mark.parquet @pytest.mark.pandas def test_write_dataset_partitioned_dict(tempdir):